summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/blob_db
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/blob_db/blob_compaction_filter.cc114
-rw-r--r--src/rocksdb/utilities/blob_db/blob_compaction_filter.h47
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db.cc103
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db.h227
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_impl.cc1922
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_impl.h425
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc108
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_iterator.h148
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_listener.h46
-rw-r--r--src/rocksdb/utilities/blob_db/blob_db_test.cc1695
-rw-r--r--src/rocksdb/utilities/blob_db/blob_dump_tool.cc279
-rw-r--r--src/rocksdb/utilities/blob_db/blob_dump_tool.h57
-rw-r--r--src/rocksdb/utilities/blob_db/blob_file.cc336
-rw-r--r--src/rocksdb/utilities/blob_db/blob_file.h214
-rw-r--r--src/rocksdb/utilities/blob_db/blob_index.h161
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_format.cc149
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_format.h125
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_reader.cc105
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_reader.h83
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_writer.cc139
-rw-r--r--src/rocksdb/utilities/blob_db/blob_log_writer.h95
21 files changed, 6578 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
new file mode 100644
index 00000000..f145d9a9
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
@@ -0,0 +1,114 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_compaction_filter.h"
+#include "db/dbformat.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+namespace {
+
+// CompactionFilter to delete expired blob index from base DB.
+class BlobIndexCompactionFilter : public CompactionFilter {
+ public:
+ BlobIndexCompactionFilter(BlobCompactionContext context,
+ uint64_t current_time, Statistics* statistics)
+ : context_(context),
+ current_time_(current_time),
+ statistics_(statistics) {}
+
+ ~BlobIndexCompactionFilter() override {
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
+ }
+
+ const char* Name() const override { return "BlobIndexCompactionFilter"; }
+
+ // Filter expired blob indexes regardless of snapshots.
+ bool IgnoreSnapshots() const override { return true; }
+
+ Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
+ const Slice& value, std::string* /*new_value*/,
+ std::string* /*skip_until*/) const override {
+ if (value_type != kBlobIndex) {
+ return Decision::kKeep;
+ }
+ BlobIndex blob_index;
+ Status s = blob_index.DecodeFrom(value);
+ if (!s.ok()) {
+ // Unable to decode blob index. Keeping the value.
+ return Decision::kKeep;
+ }
+ if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
+ // Expired
+ expired_count_++;
+ expired_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ if (!blob_index.IsInlined() &&
+ blob_index.file_number() < context_.next_file_number &&
+ context_.current_blob_files.count(blob_index.file_number()) == 0) {
+ // Corresponding blob file gone. Could have been garbage collected or
+ // evicted by FIFO eviction.
+ evicted_count_++;
+ evicted_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
+ blob_index.expiration() < context_.evict_expiration_up_to) {
+ // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
+ // get sequence number.
+ ParsedInternalKey ikey;
+ bool ok = ParseInternalKey(key, &ikey);
+ // Remove keys that could have been remove by last FIFO eviction.
+ // If get error while parsing key, ignore and continue.
+ if (ok && ikey.sequence < context_.fifo_eviction_seq) {
+ evicted_count_++;
+ evicted_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ }
+ return Decision::kKeep;
+ }
+
+ private:
+ BlobCompactionContext context_;
+ const uint64_t current_time_;
+ Statistics* statistics_;
+ // It is safe to not using std::atomic since the compaction filter, created
+ // from a compaction filter factroy, will not be called from multiple threads.
+ mutable uint64_t expired_count_ = 0;
+ mutable uint64_t expired_size_ = 0;
+ mutable uint64_t evicted_count_ = 0;
+ mutable uint64_t evicted_size_ = 0;
+};
+
+} // anonymous namespace
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) {
+ int64_t current_time = 0;
+ Status s = env_->GetCurrentTime(&current_time);
+ if (!s.ok()) {
+ return nullptr;
+ }
+ assert(current_time >= 0);
+
+ BlobCompactionContext context;
+ blob_db_impl_->GetCompactionContext(&context);
+
+ return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
+ context, static_cast<uint64_t>(current_time), statistics_));
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.h b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
new file mode 100644
index 00000000..7a8ea613
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
@@ -0,0 +1,47 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <unordered_set>
+
+#include "monitoring/statistics.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/env.h"
+#include "utilities/blob_db/blob_db_impl.h"
+#include "utilities/blob_db/blob_index.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+struct BlobCompactionContext {
+ uint64_t next_file_number;
+ std::unordered_set<uint64_t> current_blob_files;
+ SequenceNumber fifo_eviction_seq;
+ uint64_t evict_expiration_up_to;
+};
+
+class BlobIndexCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env,
+ Statistics* statistics)
+ : blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {}
+
+ virtual const char* Name() const override {
+ return "BlobIndexCompactionFilterFactory";
+ }
+
+ virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override;
+
+ private:
+ BlobDBImpl* blob_db_impl_;
+ Env* env_;
+ Statistics* statistics_;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db.cc b/src/rocksdb/utilities/blob_db/blob_db.cc
new file mode 100644
index 00000000..d660def4
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db.cc
@@ -0,0 +1,103 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "utilities/blob_db/blob_db.h"
+
+#include <inttypes.h>
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options,
+ const std::string& dbname, BlobDB** blob_db) {
+ *blob_db = nullptr;
+ DBOptions db_options(options);
+ ColumnFamilyOptions cf_options(options);
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+ std::vector<ColumnFamilyHandle*> handles;
+ Status s = BlobDB::Open(db_options, bdb_options, dbname, column_families,
+ &handles, blob_db);
+ if (s.ok()) {
+ assert(handles.size() == 1);
+ // i can delete the handle since DBImpl is always holding a reference to
+ // default column family
+ delete handles[0];
+ }
+ return s;
+}
+
+Status BlobDB::Open(const DBOptions& db_options,
+ const BlobDBOptions& bdb_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles,
+ BlobDB** blob_db) {
+ if (column_families.size() != 1 ||
+ column_families[0].name != kDefaultColumnFamilyName) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+
+ BlobDBImpl* blob_db_impl = new BlobDBImpl(dbname, bdb_options, db_options,
+ column_families[0].options);
+ Status s = blob_db_impl->Open(handles);
+ if (s.ok()) {
+ *blob_db = static_cast<BlobDB*>(blob_db_impl);
+ } else {
+ delete blob_db_impl;
+ *blob_db = nullptr;
+ }
+ return s;
+}
+
+BlobDB::BlobDB() : StackableDB(nullptr) {}
+
+void BlobDBOptions::Dump(Logger* log) const {
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.blob_dir: %s",
+ blob_dir.c_str());
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.path_relative: %d",
+ path_relative);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.is_fifo: %d",
+ is_fifo);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.max_db_size: %" PRIu64,
+ max_db_size);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.ttl_range_secs: %" PRIu64,
+ ttl_range_secs);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.min_blob_size: %" PRIu64,
+ min_blob_size);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.bytes_per_sync: %" PRIu64,
+ bytes_per_sync);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.blob_file_size: %" PRIu64,
+ blob_file_size);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.compression: %d",
+ static_cast<int>(compression));
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.enable_garbage_collection: %d",
+ enable_garbage_collection);
+ ROCKS_LOG_HEADER(
+ log, " BlobDBOptions.disable_background_tasks: %d",
+ disable_background_tasks);
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif
diff --git a/src/rocksdb/utilities/blob_db/blob_db.h b/src/rocksdb/utilities/blob_db/blob_db.h
new file mode 100644
index 00000000..3beb74fc
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db.h
@@ -0,0 +1,227 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <functional>
+#include <string>
+#include <vector>
+#include "rocksdb/db.h"
+#include "rocksdb/status.h"
+#include "rocksdb/utilities/stackable_db.h"
+
+namespace rocksdb {
+
+namespace blob_db {
+
+// A wrapped database which puts values of KV pairs in a separate log
+// and store location to the log in the underlying DB.
+// It lacks lots of importatant functionalities, e.g. DB restarts,
+// garbage collection, iterators, etc.
+//
+// The factory needs to be moved to include/rocksdb/utilities to allow
+// users to use blob DB.
+
+struct BlobDBOptions {
+ // name of the directory under main db, where blobs will be stored.
+ // default is "blob_dir"
+ std::string blob_dir = "blob_dir";
+
+ // whether the blob_dir path is relative or absolute.
+ bool path_relative = true;
+
+ // When max_db_size is reached, evict blob files to free up space
+ // instead of returnning NoSpace error on write. Blob files will be
+ // evicted from oldest to newest, based on file creation time.
+ bool is_fifo = false;
+
+ // Maximum size of the database (including SST files and blob files).
+ //
+ // Default: 0 (no limits)
+ uint64_t max_db_size = 0;
+
+ // a new bucket is opened, for ttl_range. So if ttl_range is 600seconds
+ // (10 minutes), and the first bucket starts at 1471542000
+ // then the blob buckets will be
+ // first bucket is 1471542000 - 1471542600
+ // second bucket is 1471542600 - 1471543200
+ // and so on
+ uint64_t ttl_range_secs = 3600;
+
+ // The smallest value to store in blob log. Values smaller than this threshold
+ // will be inlined in base DB together with the key.
+ uint64_t min_blob_size = 0;
+
+ // Allows OS to incrementally sync blob files to disk for every
+ // bytes_per_sync bytes written. Users shouldn't rely on it for
+ // persistency guarantee.
+ uint64_t bytes_per_sync = 512 * 1024;
+
+ // the target size of each blob file. File will become immutable
+ // after it exceeds that size
+ uint64_t blob_file_size = 256 * 1024 * 1024;
+
+ // what compression to use for Blob's
+ CompressionType compression = kNoCompression;
+
+ // If enabled, blob DB periodically cleanup stale data by rewriting remaining
+ // live data in blob files to new files. If garbage collection is not enabled,
+ // blob files will be cleanup based on TTL.
+ bool enable_garbage_collection = false;
+
+ // Disable all background job. Used for test only.
+ bool disable_background_tasks = false;
+
+ void Dump(Logger* log) const;
+};
+
+class BlobDB : public StackableDB {
+ public:
+ using rocksdb::StackableDB::Put;
+ virtual Status Put(const WriteOptions& options, const Slice& key,
+ const Slice& value) override = 0;
+ virtual Status Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override {
+ if (column_family != DefaultColumnFamily()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ return Put(options, key, value);
+ }
+
+ using rocksdb::StackableDB::Delete;
+ virtual Status Delete(const WriteOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key) override {
+ if (column_family != DefaultColumnFamily()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ assert(db_ != nullptr);
+ return db_->Delete(options, column_family, key);
+ }
+
+ virtual Status PutWithTTL(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t ttl) = 0;
+ virtual Status PutWithTTL(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value, uint64_t ttl) {
+ if (column_family != DefaultColumnFamily()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ return PutWithTTL(options, key, value, ttl);
+ }
+
+ // Put with expiration. Key with expiration time equal to
+ // std::numeric_limits<uint64_t>::max() means the key don't expire.
+ virtual Status PutUntil(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration) = 0;
+ virtual Status PutUntil(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value, uint64_t expiration) {
+ if (column_family != DefaultColumnFamily()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ return PutUntil(options, key, value, expiration);
+ }
+
+ using rocksdb::StackableDB::Get;
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) override = 0;
+
+ // Get value and expiration.
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) = 0;
+ virtual Status Get(const ReadOptions& options, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) {
+ return Get(options, DefaultColumnFamily(), key, value, expiration);
+ }
+
+ using rocksdb::StackableDB::MultiGet;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& options,
+ const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override = 0;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override {
+ for (auto column_family : column_families) {
+ if (column_family != DefaultColumnFamily()) {
+ return std::vector<Status>(
+ column_families.size(),
+ Status::NotSupported(
+ "Blob DB doesn't support non-default column family."));
+ }
+ }
+ return MultiGet(options, keys, values);
+ }
+
+ using rocksdb::StackableDB::SingleDelete;
+ virtual Status SingleDelete(const WriteOptions& /*wopts*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ using rocksdb::StackableDB::Merge;
+ virtual Status Merge(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/, const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ virtual Status Write(const WriteOptions& opts,
+ WriteBatch* updates) override = 0;
+ using rocksdb::StackableDB::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions& options) override = 0;
+ virtual Iterator* NewIterator(const ReadOptions& options,
+ ColumnFamilyHandle* column_family) override {
+ if (column_family != DefaultColumnFamily()) {
+ // Blob DB doesn't support non-default column family.
+ return nullptr;
+ }
+ return NewIterator(options);
+ }
+
+ using rocksdb::StackableDB::Close;
+ virtual Status Close() override = 0;
+
+ // Opening blob db.
+ static Status Open(const Options& options, const BlobDBOptions& bdb_options,
+ const std::string& dbname, BlobDB** blob_db);
+
+ static Status Open(const DBOptions& db_options,
+ const BlobDBOptions& bdb_options,
+ const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles,
+ BlobDB** blob_db);
+
+ virtual BlobDBOptions GetBlobDBOptions() const = 0;
+
+ virtual Status SyncBlobFiles() = 0;
+
+ virtual ~BlobDB() {}
+
+ protected:
+ explicit BlobDB();
+};
+
+// Destroy the content of the database.
+Status DestroyBlobDB(const std::string& dbname, const Options& options,
+ const BlobDBOptions& bdb_options);
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.cc b/src/rocksdb/utilities/blob_db/blob_db_impl.cc
new file mode 100644
index 00000000..5dcddc21
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_impl.cc
@@ -0,0 +1,1922 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_db_impl.h"
+#include <algorithm>
+#include <cinttypes>
+#include <iomanip>
+#include <memory>
+
+#include "db/db_impl.h"
+#include "db/write_batch_internal.h"
+#include "monitoring/instrumented_mutex.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/utilities/stackable_db.h"
+#include "rocksdb/utilities/transaction.h"
+#include "table/block.h"
+#include "table/block_based_table_builder.h"
+#include "table/block_builder.h"
+#include "table/meta_blocks.h"
+#include "util/cast_util.h"
+#include "util/crc32c.h"
+#include "util/file_reader_writer.h"
+#include "util/file_util.h"
+#include "util/filename.h"
+#include "util/logging.h"
+#include "util/mutexlock.h"
+#include "util/random.h"
+#include "util/sst_file_manager_impl.h"
+#include "util/stop_watch.h"
+#include "util/sync_point.h"
+#include "util/timer_queue.h"
+#include "utilities/blob_db/blob_compaction_filter.h"
+#include "utilities/blob_db/blob_db_iterator.h"
+#include "utilities/blob_db/blob_db_listener.h"
+#include "utilities/blob_db/blob_index.h"
+
+namespace {
+int kBlockBasedTableVersionFormat = 2;
+} // end namespace
+
+namespace rocksdb {
+namespace blob_db {
+
+bool BlobFileComparator::operator()(
+ const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const {
+ return lhs->BlobFileNumber() > rhs->BlobFileNumber();
+}
+
+bool BlobFileComparatorTTL::operator()(
+ const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const {
+ assert(lhs->HasTTL() && rhs->HasTTL());
+ if (lhs->expiration_range_.first < rhs->expiration_range_.first) {
+ return true;
+ }
+ if (lhs->expiration_range_.first > rhs->expiration_range_.first) {
+ return false;
+ }
+ return lhs->BlobFileNumber() < rhs->BlobFileNumber();
+}
+
+BlobDBImpl::BlobDBImpl(const std::string& dbname,
+ const BlobDBOptions& blob_db_options,
+ const DBOptions& db_options,
+ const ColumnFamilyOptions& cf_options)
+ : BlobDB(),
+ dbname_(dbname),
+ db_impl_(nullptr),
+ env_(db_options.env),
+ bdb_options_(blob_db_options),
+ db_options_(db_options),
+ cf_options_(cf_options),
+ env_options_(db_options),
+ statistics_(db_options_.statistics.get()),
+ next_file_number_(1),
+ epoch_of_(0),
+ closed_(true),
+ open_file_count_(0),
+ total_blob_size_(0),
+ live_sst_size_(0),
+ fifo_eviction_seq_(0),
+ evict_expiration_up_to_(0),
+ debug_level_(0) {
+ blob_dir_ = (bdb_options_.path_relative)
+ ? dbname + "/" + bdb_options_.blob_dir
+ : bdb_options_.blob_dir;
+ env_options_.bytes_per_sync = blob_db_options.bytes_per_sync;
+}
+
+BlobDBImpl::~BlobDBImpl() {
+ tqueue_.shutdown();
+ // CancelAllBackgroundWork(db_, true);
+ Status s __attribute__((__unused__)) = Close();
+ assert(s.ok());
+}
+
+Status BlobDBImpl::Close() {
+ if (closed_) {
+ return Status::OK();
+ }
+ closed_ = true;
+
+ // Close base DB before BlobDBImpl destructs to stop event listener and
+ // compaction filter call.
+ Status s = db_->Close();
+ // delete db_ anyway even if close failed.
+ delete db_;
+ // Reset pointers to avoid StackableDB delete the pointer again.
+ db_ = nullptr;
+ db_impl_ = nullptr;
+ if (!s.ok()) {
+ return s;
+ }
+
+ s = SyncBlobFiles();
+ return s;
+}
+
+BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; }
+
+Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
+ assert(handles != nullptr);
+ assert(db_ == nullptr);
+ if (blob_dir_.empty()) {
+ return Status::NotSupported("No blob directory in options");
+ }
+ if (cf_options_.compaction_filter != nullptr ||
+ cf_options_.compaction_filter_factory != nullptr) {
+ return Status::NotSupported("Blob DB doesn't support compaction filter.");
+ }
+
+ Status s;
+
+ // Create info log.
+ if (db_options_.info_log == nullptr) {
+ s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB...");
+
+ // Open blob directory.
+ s = env_->CreateDirIfMissing(blob_dir_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to create blob_dir %s, status: %s",
+ blob_dir_.c_str(), s.ToString().c_str());
+ }
+ s = env_->NewDirectory(blob_dir_, &dir_ent_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(),
+ s.ToString().c_str());
+ return s;
+ }
+
+ // Open blob files.
+ s = OpenAllBlobFiles();
+ if (!s.ok()) {
+ return s;
+ }
+
+ // Update options
+ db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
+ cf_options_.compaction_filter_factory.reset(
+ new BlobIndexCompactionFilterFactory(this, env_, statistics_));
+
+ // Open base db.
+ ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_);
+ s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_);
+ if (!s.ok()) {
+ return s;
+ }
+ db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB());
+
+ // Add trash files in blob dir to file delete scheduler.
+ SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>(
+ db_impl_->immutable_db_options().sst_file_manager.get());
+ DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_);
+
+ UpdateLiveSSTSize();
+
+ // Start background jobs.
+ if (!bdb_options_.disable_background_tasks) {
+ StartBackgroundTasks();
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this);
+ bdb_options_.Dump(db_options_.info_log.get());
+ closed_ = false;
+ return s;
+}
+
+void BlobDBImpl::StartBackgroundTasks() {
+ // store a call to a member function and object
+ tqueue_.add(
+ kReclaimOpenFilesPeriodMillisecs,
+ std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1));
+ tqueue_.add(
+ kDeleteObsoleteFilesPeriodMillisecs,
+ std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1));
+ tqueue_.add(kSanityCheckPeriodMillisecs,
+ std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1));
+ tqueue_.add(
+ kEvictExpiredFilesPeriodMillisecs,
+ std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1));
+}
+
+Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) {
+ assert(file_numbers != nullptr);
+ std::vector<std::string> all_files;
+ Status s = env_->GetChildren(blob_dir_, &all_files);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to get list of blob files, status: %s",
+ s.ToString().c_str());
+ return s;
+ }
+
+ for (const auto& file_name : all_files) {
+ uint64_t file_number;
+ FileType type;
+ bool success = ParseFileName(file_name, &file_number, &type);
+ if (success && type == kBlobFile) {
+ file_numbers->insert(file_number);
+ } else {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Skipping file in blob directory: %s", file_name.c_str());
+ }
+ }
+
+ return s;
+}
+
+Status BlobDBImpl::OpenAllBlobFiles() {
+ std::set<uint64_t> file_numbers;
+ Status s = GetAllBlobFiles(&file_numbers);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (!file_numbers.empty()) {
+ next_file_number_.store(*file_numbers.rbegin() + 1);
+ }
+
+ std::string blob_file_list;
+ std::string obsolete_file_list;
+
+ for (auto& file_number : file_numbers) {
+ std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
+ this, blob_dir_, file_number, db_options_.info_log.get());
+ blob_file->MarkImmutable();
+
+ // Read file header and footer
+ Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
+ if (read_metadata_status.IsCorruption()) {
+ // Remove incomplete file.
+ ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
+ if (!obsolete_file_list.empty()) {
+ obsolete_file_list.append(", ");
+ }
+ obsolete_file_list.append(ToString(file_number));
+ continue;
+ } else if (!read_metadata_status.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Unable to read metadata of blob file %" PRIu64
+ ", status: '%s'",
+ file_number, read_metadata_status.ToString().c_str());
+ return read_metadata_status;
+ }
+
+ total_blob_size_ += blob_file->GetFileSize();
+
+ blob_files_[file_number] = blob_file;
+ if (!blob_file_list.empty()) {
+ blob_file_list.append(", ");
+ }
+ blob_file_list.append(ToString(file_number));
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
+ blob_file_list.c_str());
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Found %" ROCKSDB_PRIszt
+ " incomplete or corrupted blob files: %s",
+ obsolete_files_.size(), obsolete_file_list.c_str());
+ return s;
+}
+
+void BlobDBImpl::CloseRandomAccessLocked(
+ const std::shared_ptr<BlobFile>& bfile) {
+ bfile->CloseRandomAccessLocked();
+ open_file_count_--;
+}
+
+Status BlobDBImpl::GetBlobFileReader(
+ const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<RandomAccessFileReader>* reader) {
+ assert(reader != nullptr);
+ bool fresh_open = false;
+ Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open);
+ if (s.ok() && fresh_open) {
+ assert(*reader != nullptr);
+ open_file_count_++;
+ }
+ return s;
+}
+
+std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) {
+ uint64_t file_num = next_file_number_++;
+ auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num,
+ db_options_.info_log.get());
+ ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'",
+ bfile->PathName().c_str(), reason.c_str());
+ LogFlush(db_options_.info_log);
+ return bfile;
+}
+
+Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
+ std::string fpath(bfile->PathName());
+ std::unique_ptr<WritableFile> wfile;
+
+ Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to open blob file for write: %s status: '%s'"
+ " exists: '%s'",
+ fpath.c_str(), s.ToString().c_str(),
+ env_->FileExists(fpath).ToString().c_str());
+ return s;
+ }
+
+ std::unique_ptr<WritableFileWriter> fwriter;
+ fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, env_options_));
+
+ uint64_t boffset = bfile->GetFileSize();
+ if (debug_level_ >= 2 && boffset) {
+ ROCKS_LOG_DEBUG(db_options_.info_log,
+ "Open blob file: %s with offset: %" PRIu64, fpath.c_str(),
+ boffset);
+ }
+
+ Writer::ElemType et = Writer::kEtNone;
+ if (bfile->file_size_ == BlobLogHeader::kSize) {
+ et = Writer::kEtFileHdr;
+ } else if (bfile->file_size_ > BlobLogHeader::kSize) {
+ et = Writer::kEtRecord;
+ } else if (bfile->file_size_) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Open blob file: %s with wrong size: %" PRIu64,
+ fpath.c_str(), boffset);
+ return Status::Corruption("Invalid blob file size");
+ }
+
+ bfile->log_writer_ = std::make_shared<Writer>(
+ std::move(fwriter), env_, statistics_, bfile->file_number_,
+ bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset);
+ bfile->log_writer_->last_elem_type_ = et;
+
+ return s;
+}
+
+std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked(
+ uint64_t expiration) const {
+ if (open_ttl_files_.empty()) {
+ return nullptr;
+ }
+
+ std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>();
+ tmp->SetHasTTL(true);
+ tmp->expiration_range_ = std::make_pair(expiration, 0);
+ tmp->file_number_ = std::numeric_limits<uint64_t>::max();
+
+ auto citr = open_ttl_files_.equal_range(tmp);
+ if (citr.first == open_ttl_files_.end()) {
+ assert(citr.second == open_ttl_files_.end());
+
+ std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin());
+ return (check->expiration_range_.second <= expiration) ? nullptr : check;
+ }
+
+ if (citr.first != citr.second) {
+ return *(citr.first);
+ }
+
+ auto finditr = citr.second;
+ if (finditr != open_ttl_files_.begin()) {
+ --finditr;
+ }
+
+ bool b2 = (*finditr)->expiration_range_.second <= expiration;
+ bool b1 = (*finditr)->expiration_range_.first > expiration;
+
+ return (b1 || b2) ? nullptr : (*finditr);
+}
+
+Status BlobDBImpl::CheckOrCreateWriterLocked(
+ const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<Writer>* writer) {
+ assert(writer != nullptr);
+ *writer = blob_file->GetWriter();
+ if (*writer != nullptr) {
+ return Status::OK();
+ }
+ Status s = CreateWriterLocked(blob_file);
+ if (s.ok()) {
+ *writer = blob_file->GetWriter();
+ }
+ return s;
+}
+
+Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) {
+ assert(blob_file != nullptr);
+ {
+ ReadLock rl(&mutex_);
+ if (open_non_ttl_file_ != nullptr) {
+ *blob_file = open_non_ttl_file_;
+ return Status::OK();
+ }
+ }
+
+ // CHECK again
+ WriteLock wl(&mutex_);
+ if (open_non_ttl_file_ != nullptr) {
+ *blob_file = open_non_ttl_file_;
+ return Status::OK();
+ }
+
+ *blob_file = NewBlobFile("SelectBlobFile");
+ assert(*blob_file != nullptr);
+
+ // file not visible, hence no lock
+ std::shared_ptr<Writer> writer;
+ Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to get writer from blob file: %s, error: %s",
+ (*blob_file)->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ (*blob_file)->file_size_ = BlobLogHeader::kSize;
+ (*blob_file)->header_.compression = bdb_options_.compression;
+ (*blob_file)->header_.has_ttl = false;
+ (*blob_file)->header_.column_family_id =
+ reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ (*blob_file)->header_valid_ = true;
+ (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
+ (*blob_file)->SetHasTTL(false);
+ (*blob_file)->SetCompression(bdb_options_.compression);
+
+ s = writer->WriteHeader((*blob_file)->header_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to write header to new blob file: %s"
+ " status: '%s'",
+ (*blob_file)->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ blob_files_.insert(
+ std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
+ open_non_ttl_file_ = *blob_file;
+ total_blob_size_ += BlobLogHeader::kSize;
+ return s;
+}
+
+Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration,
+ std::shared_ptr<BlobFile>* blob_file) {
+ assert(blob_file != nullptr);
+ assert(expiration != kNoExpiration);
+ uint64_t epoch_read = 0;
+ {
+ ReadLock rl(&mutex_);
+ *blob_file = FindBlobFileLocked(expiration);
+ epoch_read = epoch_of_.load();
+ }
+
+ if (*blob_file != nullptr) {
+ assert(!(*blob_file)->Immutable());
+ return Status::OK();
+ }
+
+ uint64_t exp_low =
+ (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs;
+ uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs;
+ ExpirationRange expiration_range = std::make_pair(exp_low, exp_high);
+
+ *blob_file = NewBlobFile("SelectBlobFileTTL");
+ assert(*blob_file != nullptr);
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "New blob file TTL range: %s %" PRIu64 " %" PRIu64,
+ (*blob_file)->PathName().c_str(), exp_low, exp_high);
+ LogFlush(db_options_.info_log);
+
+ // we don't need to take lock as no other thread is seeing bfile yet
+ std::shared_ptr<Writer> writer;
+ Status s = CheckOrCreateWriterLocked(*blob_file, &writer);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Failed to get writer from blob file with TTL: %s, error: %s",
+ (*blob_file)->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ (*blob_file)->header_.expiration_range = expiration_range;
+ (*blob_file)->header_.compression = bdb_options_.compression;
+ (*blob_file)->header_.has_ttl = true;
+ (*blob_file)->header_.column_family_id =
+ reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ (*blob_file)->header_valid_ = true;
+ (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id);
+ (*blob_file)->SetHasTTL(true);
+ (*blob_file)->SetCompression(bdb_options_.compression);
+ (*blob_file)->file_size_ = BlobLogHeader::kSize;
+
+ // set the first value of the range, since that is
+ // concrete at this time. also necessary to add to open_ttl_files_
+ (*blob_file)->expiration_range_ = expiration_range;
+
+ WriteLock wl(&mutex_);
+ // in case the epoch has shifted in the interim, then check
+ // check condition again - should be rare.
+ if (epoch_of_.load() != epoch_read) {
+ std::shared_ptr<BlobFile> blob_file2 = FindBlobFileLocked(expiration);
+ if (blob_file2 != nullptr) {
+ *blob_file = std::move(blob_file2);
+ return Status::OK();
+ }
+ }
+
+ s = writer->WriteHeader((*blob_file)->header_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to write header to new blob file: %s"
+ " status: '%s'",
+ (*blob_file)->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ blob_files_.insert(
+ std::make_pair((*blob_file)->BlobFileNumber(), *blob_file));
+ open_ttl_files_.insert(*blob_file);
+ total_blob_size_ += BlobLogHeader::kSize;
+ epoch_of_++;
+
+ return s;
+}
+
+class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
+ private:
+ const WriteOptions& options_;
+ BlobDBImpl* blob_db_impl_;
+ uint32_t default_cf_id_;
+ WriteBatch batch_;
+
+ public:
+ BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl,
+ uint32_t default_cf_id)
+ : options_(options),
+ blob_db_impl_(blob_db_impl),
+ default_cf_id_(default_cf_id) {}
+
+ WriteBatch* batch() { return &batch_; }
+
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override {
+ if (column_family_id != default_cf_id_) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration,
+ &batch_);
+ return s;
+ }
+
+ Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
+ if (column_family_id != default_cf_id_) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key);
+ return s;
+ }
+
+ virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key,
+ const Slice& end_key) {
+ if (column_family_id != default_cf_id_) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id,
+ begin_key, end_key);
+ return s;
+ }
+
+ Status SingleDeleteCF(uint32_t /*column_family_id*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in blob db.");
+ }
+
+ void LogData(const Slice& blob) override { batch_.PutLogData(blob); }
+};
+
+Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
+ StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_WRITE);
+ uint32_t default_cf_id =
+ reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ Status s;
+ BlobInserter blob_inserter(options, this, default_cf_id);
+ {
+ // Release write_mutex_ before DB write to avoid race condition with
+ // flush begin listener, which also require write_mutex_ to sync
+ // blob files.
+ MutexLock l(&write_mutex_);
+ s = updates->Iterate(&blob_inserter);
+ }
+ if (!s.ok()) {
+ return s;
+ }
+ return db_->Write(options, blob_inserter.batch());
+}
+
+Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
+ const Slice& value) {
+ return PutUntil(options, key, value, kNoExpiration);
+}
+
+Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
+ const Slice& key, const Slice& value,
+ uint64_t ttl) {
+ uint64_t now = EpochNow();
+ uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration;
+ return PutUntil(options, key, value, expiration);
+}
+
+Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration) {
+ StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_PUT);
+ TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start");
+ Status s;
+ WriteBatch batch;
+ {
+ // Release write_mutex_ before DB write to avoid race condition with
+ // flush begin listener, which also require write_mutex_ to sync
+ // blob files.
+ MutexLock l(&write_mutex_);
+ s = PutBlobValue(options, key, value, expiration, &batch);
+ }
+ if (s.ok()) {
+ s = db_->Write(options, &batch);
+ }
+ TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish");
+ return s;
+}
+
+Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/,
+ const Slice& key, const Slice& value,
+ uint64_t expiration, WriteBatch* batch) {
+ write_mutex_.AssertHeld();
+ Status s;
+ std::string index_entry;
+ uint32_t column_family_id =
+ reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID();
+ if (value.size() < bdb_options_.min_blob_size) {
+ if (expiration == kNoExpiration) {
+ // Put as normal value
+ s = batch->Put(key, value);
+ RecordTick(statistics_, BLOB_DB_WRITE_INLINED);
+ } else {
+ // Inlined with TTL
+ BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value);
+ s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
+ index_entry);
+ RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL);
+ }
+ } else {
+ std::string compression_output;
+ Slice value_compressed = GetCompressedSlice(value, &compression_output);
+
+ std::string headerbuf;
+ Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration);
+
+ // Check DB size limit before selecting blob file to
+ // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be
+ // done before calling SelectBlobFile().
+ s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() +
+ value_compressed.size());
+ if (!s.ok()) {
+ return s;
+ }
+
+ std::shared_ptr<BlobFile> blob_file;
+ if (expiration != kNoExpiration) {
+ s = SelectBlobFileTTL(expiration, &blob_file);
+ } else {
+ s = SelectBlobFile(&blob_file);
+ }
+ if (s.ok()) {
+ assert(blob_file != nullptr);
+ assert(blob_file->compression() == bdb_options_.compression);
+ s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration,
+ &index_entry);
+ }
+ if (s.ok()) {
+ if (expiration != kNoExpiration) {
+ blob_file->ExtendExpirationRange(expiration);
+ }
+ s = CloseBlobFileIfNeeded(blob_file);
+ }
+ if (s.ok()) {
+ s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key,
+ index_entry);
+ }
+ if (s.ok()) {
+ if (expiration == kNoExpiration) {
+ RecordTick(statistics_, BLOB_DB_WRITE_BLOB);
+ } else {
+ RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL);
+ }
+ } else {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt
+ " status: '%s' blob_file: '%s'",
+ blob_file->PathName().c_str(), key.ToString().c_str(), value.size(),
+ s.ToString().c_str(), blob_file->DumpState().c_str());
+ }
+ }
+
+ RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN);
+ RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size());
+ RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size());
+ RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size());
+
+ return s;
+}
+
+Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
+ std::string* compression_output) const {
+ if (bdb_options_.compression == kNoCompression) {
+ return raw;
+ }
+ StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS);
+ CompressionType type = bdb_options_.compression;
+ CompressionOptions opts;
+ CompressionContext context(type);
+ CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type,
+ 0 /* sample_for_compression */);
+ CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false,
+ compression_output, nullptr, nullptr);
+ return *compression_output;
+}
+
+void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
+ ReadLock l(&mutex_);
+
+ context->next_file_number = next_file_number_.load();
+ context->current_blob_files.clear();
+ for (auto& p : blob_files_) {
+ context->current_blob_files.insert(p.first);
+ }
+ context->fifo_eviction_seq = fifo_eviction_seq_;
+ context->evict_expiration_up_to = evict_expiration_up_to_;
+}
+
+void BlobDBImpl::UpdateLiveSSTSize() {
+ uint64_t live_sst_size = 0;
+ bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size);
+ if (ok) {
+ live_sst_size_.store(live_sst_size);
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Updated total SST file size: %" PRIu64 " bytes.",
+ live_sst_size);
+ } else {
+ ROCKS_LOG_ERROR(
+ db_options_.info_log,
+ "Failed to update total SST file size after flush or compaction.");
+ }
+ {
+ // Trigger FIFO eviction if needed.
+ MutexLock l(&write_mutex_);
+ Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/);
+ if (s.IsNoSpace()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "DB grow out-of-space after SST size updated. Current live"
+ " SST size: %" PRIu64
+ " , current blob files size: %" PRIu64 ".",
+ live_sst_size_.load(), total_blob_size_.load());
+ }
+ }
+}
+
+Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
+ bool force_evict) {
+ write_mutex_.AssertHeld();
+
+ uint64_t live_sst_size = live_sst_size_.load();
+ if (bdb_options_.max_db_size == 0 ||
+ live_sst_size + total_blob_size_.load() + blob_size <=
+ bdb_options_.max_db_size) {
+ return Status::OK();
+ }
+
+ if (bdb_options_.is_fifo == false ||
+ (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) {
+ // FIFO eviction is disabled, or no space to insert new blob even we evict
+ // all blob files.
+ return Status::NoSpace(
+ "Write failed, as writing it would exceed max_db_size limit.");
+ }
+
+ std::vector<std::shared_ptr<BlobFile>> candidate_files;
+ CopyBlobFiles(&candidate_files);
+ std::sort(candidate_files.begin(), candidate_files.end(),
+ BlobFileComparator());
+ fifo_eviction_seq_ = GetLatestSequenceNumber();
+
+ WriteLock l(&mutex_);
+
+ while (!candidate_files.empty() &&
+ live_sst_size + total_blob_size_.load() + blob_size >
+ bdb_options_.max_db_size) {
+ std::shared_ptr<BlobFile> blob_file = candidate_files.back();
+ candidate_files.pop_back();
+ WriteLock file_lock(&blob_file->mutex_);
+ if (blob_file->Obsolete()) {
+ // File already obsoleted by someone else.
+ continue;
+ }
+ // FIFO eviction can evict open blob files.
+ if (!blob_file->Immutable()) {
+ Status s = CloseBlobFile(blob_file, false /*need_lock*/);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ assert(blob_file->Immutable());
+ auto expiration_range = blob_file->GetExpirationRange();
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Evict oldest blob file since DB out of space. Current "
+ "live SST file size: %" PRIu64 ", total blob size: %" PRIu64
+ ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64
+ ".",
+ live_sst_size, total_blob_size_.load(),
+ bdb_options_.max_db_size, blob_file->BlobFileNumber());
+ ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/);
+ evict_expiration_up_to_ = expiration_range.first;
+ RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED);
+ RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED,
+ blob_file->BlobCount());
+ RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED,
+ blob_file->GetFileSize());
+ TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted");
+ }
+ if (live_sst_size + total_blob_size_.load() + blob_size >
+ bdb_options_.max_db_size) {
+ return Status::NoSpace(
+ "Write failed, as writing it would exceed max_db_size limit.");
+ }
+ return Status::OK();
+}
+
+Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
+ const std::string& headerbuf, const Slice& key,
+ const Slice& value, uint64_t expiration,
+ std::string* index_entry) {
+ Status s;
+ uint64_t blob_offset = 0;
+ uint64_t key_offset = 0;
+ {
+ WriteLock lockbfile_w(&bfile->mutex_);
+ std::shared_ptr<Writer> writer;
+ s = CheckOrCreateWriterLocked(bfile, &writer);
+ if (!s.ok()) {
+ return s;
+ }
+
+ // write the blob to the blob log.
+ s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset,
+ &blob_offset);
+ }
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Invalid status in AppendBlob: %s status: '%s'",
+ bfile->PathName().c_str(), s.ToString().c_str());
+ return s;
+ }
+
+ // increment blob count
+ bfile->blob_count_++;
+
+ uint64_t size_put = headerbuf.size() + key.size() + value.size();
+ bfile->file_size_ += size_put;
+ total_blob_size_ += size_put;
+
+ if (expiration == kNoExpiration) {
+ BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset,
+ value.size(), bdb_options_.compression);
+ } else {
+ BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(),
+ blob_offset, value.size(),
+ bdb_options_.compression);
+ }
+
+ return s;
+}
+
+std::vector<Status> BlobDBImpl::MultiGet(
+ const ReadOptions& read_options,
+ const std::vector<Slice>& keys, std::vector<std::string>* values) {
+ StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_MULTIGET);
+ // Get a snapshot to avoid blob file get deleted between we
+ // fetch and index entry and reading from the file.
+ ReadOptions ro(read_options);
+ bool snapshot_created = SetSnapshotIfNeeded(&ro);
+
+ std::vector<Status> statuses;
+ statuses.reserve(keys.size());
+ values->clear();
+ values->reserve(keys.size());
+ PinnableSlice value;
+ for (size_t i = 0; i < keys.size(); i++) {
+ statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value));
+ values->push_back(value.ToString());
+ value.Reset();
+ }
+ if (snapshot_created) {
+ db_->ReleaseSnapshot(ro.snapshot);
+ }
+ return statuses;
+}
+
+bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) {
+ assert(read_options != nullptr);
+ if (read_options->snapshot != nullptr) {
+ return false;
+ }
+ read_options->snapshot = db_->GetSnapshot();
+ return true;
+}
+
+Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value, uint64_t* expiration) {
+ assert(value != nullptr);
+ BlobIndex blob_index;
+ Status s = blob_index.DecodeFrom(index_entry);
+ if (!s.ok()) {
+ return s;
+ }
+ if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) {
+ return Status::NotFound("Key expired");
+ }
+ if (expiration != nullptr) {
+ if (blob_index.HasTTL()) {
+ *expiration = blob_index.expiration();
+ } else {
+ *expiration = kNoExpiration;
+ }
+ }
+ if (blob_index.IsInlined()) {
+ // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same
+ // memory buffer to avoid extra copy.
+ value->PinSelf(blob_index.value());
+ return Status::OK();
+ }
+ if (blob_index.size() == 0) {
+ value->PinSelf("");
+ return Status::OK();
+ }
+
+ // offset has to have certain min, as we will read CRC
+ // later from the Blob Header, which needs to be also a
+ // valid offset.
+ if (blob_index.offset() <
+ (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) {
+ if (debug_level_ >= 2) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Invalid blob index file_number: %" PRIu64
+ " blob_offset: %" PRIu64 " blob_size: %" PRIu64
+ " key: %s",
+ blob_index.file_number(), blob_index.offset(),
+ blob_index.size(), key.data());
+ }
+ return Status::NotFound("Invalid blob offset");
+ }
+
+ std::shared_ptr<BlobFile> bfile;
+ {
+ ReadLock rl(&mutex_);
+ auto hitr = blob_files_.find(blob_index.file_number());
+
+ // file was deleted
+ if (hitr == blob_files_.end()) {
+ return Status::NotFound("Blob Not Found as blob file missing");
+ }
+
+ bfile = hitr->second;
+ }
+
+ if (blob_index.size() == 0 && value != nullptr) {
+ value->PinSelf("");
+ return Status::OK();
+ }
+
+ // takes locks when called
+ std::shared_ptr<RandomAccessFileReader> reader;
+ s = GetBlobFileReader(bfile, &reader);
+ if (!s.ok()) {
+ return s;
+ }
+
+ assert(blob_index.offset() > key.size() + sizeof(uint32_t));
+ uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t);
+ uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size();
+
+ // Allocate the buffer. This is safe in C++11
+ std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0));
+ char* buffer = &buffer_str[0];
+
+ // A partial blob record contain checksum, key and value.
+ Slice blob_record;
+ {
+ StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
+ s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer);
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size());
+ }
+ if (!s.ok()) {
+ ROCKS_LOG_DEBUG(db_options_.info_log,
+ "Failed to read blob from blob file %" PRIu64
+ ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
+ ", key_size: %" ROCKSDB_PRIszt ", status: '%s'",
+ bfile->BlobFileNumber(), blob_index.offset(),
+ blob_index.size(), key.size(), s.ToString().c_str());
+ return s;
+ }
+ if (blob_record.size() != record_size) {
+ ROCKS_LOG_DEBUG(
+ db_options_.info_log,
+ "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64
+ ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt
+ ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes",
+ bfile->BlobFileNumber(), blob_index.offset(), blob_index.size(),
+ key.size(), blob_record.size(), record_size);
+
+ return Status::Corruption("Failed to retrieve blob from blob index.");
+ }
+ Slice crc_slice(blob_record.data(), sizeof(uint32_t));
+ Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(),
+ static_cast<size_t>(blob_index.size()));
+ uint32_t crc_exp;
+ if (!GetFixed32(&crc_slice, &crc_exp)) {
+ ROCKS_LOG_DEBUG(db_options_.info_log,
+ "Unable to decode CRC from blob file %" PRIu64
+ ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64
+ ", key size: %" ROCKSDB_PRIszt ", status: '%s'",
+ bfile->BlobFileNumber(), blob_index.offset(),
+ blob_index.size(), key.size(), s.ToString().c_str());
+ return Status::Corruption("Unable to decode checksum.");
+ }
+
+ uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t),
+ blob_record.size() - sizeof(uint32_t));
+ crc = crc32c::Mask(crc); // Adjust for storage
+ if (crc != crc_exp) {
+ if (debug_level_ >= 2) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Blob crc mismatch file: %s blob_offset: %" PRIu64
+ " blob_size: %" PRIu64 " key: %s status: '%s'",
+ bfile->PathName().c_str(), blob_index.offset(),
+ blob_index.size(), key.data(), s.ToString().c_str());
+ }
+ return Status::Corruption("Corruption. Blob CRC mismatch");
+ }
+
+ if (bfile->compression() == kNoCompression) {
+ value->PinSelf(blob_value);
+ } else {
+ BlockContents contents;
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
+ {
+ StopWatch decompression_sw(env_, statistics_,
+ BLOB_DB_DECOMPRESSION_MICROS);
+ UncompressionContext context(bfile->compression());
+ UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+ bfile->compression());
+ s = UncompressBlockContentsForCompressionType(
+ info, blob_value.data(), blob_value.size(), &contents,
+ kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions()));
+ }
+ value->PinSelf(contents.data);
+ }
+
+ return s;
+}
+
+Status BlobDBImpl::Get(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) {
+ return Get(read_options, column_family, key, value, nullptr /*expiration*/);
+}
+
+Status BlobDBImpl::Get(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) {
+ StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_GET);
+ return GetImpl(read_options, column_family, key, value, expiration);
+}
+
+Status BlobDBImpl::GetImpl(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration) {
+ if (column_family != DefaultColumnFamily()) {
+ return Status::NotSupported(
+ "Blob DB doesn't support non-default column family.");
+ }
+ // Get a snapshot to avoid blob file get deleted between we
+ // fetch and index entry and reading from the file.
+ // TODO(yiwu): For Get() retry if file not found would be a simpler strategy.
+ ReadOptions ro(read_options);
+ bool snapshot_created = SetSnapshotIfNeeded(&ro);
+
+ PinnableSlice index_entry;
+ Status s;
+ bool is_blob_index = false;
+ s = db_impl_->GetImpl(ro, column_family, key, &index_entry,
+ nullptr /*value_found*/, nullptr /*read_callback*/,
+ &is_blob_index);
+ TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1");
+ TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2");
+ if (expiration != nullptr) {
+ *expiration = kNoExpiration;
+ }
+ RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ);
+ if (s.ok()) {
+ if (is_blob_index) {
+ s = GetBlobValue(key, index_entry, value, expiration);
+ } else {
+ // The index entry is the value itself in this case.
+ value->PinSelf(index_entry);
+ }
+ RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size());
+ }
+ if (snapshot_created) {
+ db_->ReleaseSnapshot(ro.snapshot);
+ }
+ return s;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check");
+ ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt,
+ blob_files_.size());
+ ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
+ open_ttl_files_.size());
+
+ for (auto bfile : open_ttl_files_) {
+ assert(!bfile->Immutable());
+ }
+
+ uint64_t now = EpochNow();
+
+ for (auto blob_file_pair : blob_files_) {
+ auto blob_file = blob_file_pair.second;
+ char buf[1000];
+ int pos = snprintf(buf, sizeof(buf),
+ "Blob file %" PRIu64 ", size %" PRIu64
+ ", blob count %" PRIu64 ", immutable %d",
+ blob_file->BlobFileNumber(), blob_file->GetFileSize(),
+ blob_file->BlobCount(), blob_file->Immutable());
+ if (blob_file->HasTTL()) {
+ auto expiration_range = blob_file->GetExpirationRange();
+ pos += snprintf(buf + pos, sizeof(buf) - pos,
+ ", expiration range (%" PRIu64 ", %" PRIu64 ")",
+ expiration_range.first, expiration_range.second);
+ if (!blob_file->Obsolete()) {
+ pos += snprintf(buf + pos, sizeof(buf) - pos,
+ ", expire in %" PRIu64 " seconds",
+ expiration_range.second - now);
+ }
+ }
+ if (blob_file->Obsolete()) {
+ pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64,
+ blob_file->GetObsoleteSequence());
+ }
+ snprintf(buf + pos, sizeof(buf) - pos, ".");
+ ROCKS_LOG_INFO(db_options_.info_log, "%s", buf);
+ }
+
+ // reschedule
+ return std::make_pair(true, -1);
+}
+
+Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile,
+ bool need_lock) {
+ assert(bfile != nullptr);
+ write_mutex_.AssertHeld();
+ Status s;
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Closing blob file %" PRIu64 ". Path: %s",
+ bfile->BlobFileNumber(), bfile->PathName().c_str());
+ {
+ std::unique_ptr<WriteLock> lock;
+ if (need_lock) {
+ lock.reset(new WriteLock(&mutex_));
+ }
+
+ if (bfile->HasTTL()) {
+ size_t erased __attribute__((__unused__));
+ erased = open_ttl_files_.erase(bfile);
+ } else if (bfile == open_non_ttl_file_) {
+ open_non_ttl_file_ = nullptr;
+ }
+ }
+
+ if (!bfile->closed_.load()) {
+ std::unique_ptr<WriteLock> file_lock;
+ if (need_lock) {
+ file_lock.reset(new WriteLock(&bfile->mutex_));
+ }
+ s = bfile->WriteFooterAndCloseLocked();
+ }
+
+ if (s.ok()) {
+ total_blob_size_ += BlobLogFooter::kSize;
+ } else {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to close blob file %" PRIu64 "with error: %s",
+ bfile->BlobFileNumber(), s.ToString().c_str());
+ }
+
+ return s;
+}
+
+Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
+ // atomic read
+ if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
+ return Status::OK();
+ }
+ return CloseBlobFile(bfile);
+}
+
+void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
+ SequenceNumber obsolete_seq,
+ bool update_size) {
+ // Should hold write lock of mutex_ or during DB open.
+ blob_file->MarkObsolete(obsolete_seq);
+ obsolete_files_.push_back(blob_file);
+ assert(total_blob_size_.load() >= blob_file->GetFileSize());
+ if (update_size) {
+ total_blob_size_ -= blob_file->GetFileSize();
+ }
+}
+
+bool BlobDBImpl::VisibleToActiveSnapshot(
+ const std::shared_ptr<BlobFile>& bfile) {
+ assert(bfile->Obsolete());
+
+ // We check whether the oldest snapshot is no less than the last sequence
+ // by the time the blob file become obsolete. If so, the blob file is not
+ // visible to all existing snapshots.
+ //
+ // If we keep track of the earliest sequence of the keys in the blob file,
+ // we could instead check if there's a snapshot falls in range
+ // [earliest_sequence, obsolete_sequence). But doing so will make the
+ // implementation more complicated.
+ SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence();
+ SequenceNumber oldest_snapshot = kMaxSequenceNumber;
+ {
+ // Need to lock DBImpl mutex before access snapshot list.
+ InstrumentedMutexLock l(db_impl_->mutex());
+ auto& snapshots = db_impl_->snapshots();
+ if (!snapshots.empty()) {
+ oldest_snapshot = snapshots.oldest()->GetSequenceNumber();
+ }
+ }
+ bool visible = oldest_snapshot < obsolete_sequence;
+ if (visible) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64
+ ") visible to oldest snapshot %" PRIu64 ".",
+ bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot);
+ }
+ return visible;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0");
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1");
+
+ std::vector<std::shared_ptr<BlobFile>> process_files;
+ uint64_t now = EpochNow();
+ {
+ ReadLock rl(&mutex_);
+ for (auto p : blob_files_) {
+ auto& blob_file = p.second;
+ ReadLock file_lock(&blob_file->mutex_);
+ if (blob_file->HasTTL() && !blob_file->Obsolete() &&
+ blob_file->GetExpirationRange().second <= now) {
+ process_files.push_back(blob_file);
+ }
+ }
+ }
+
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2");
+ TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3");
+ TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr);
+
+ SequenceNumber seq = GetLatestSequenceNumber();
+ {
+ MutexLock l(&write_mutex_);
+ for (auto& blob_file : process_files) {
+ WriteLock file_lock(&blob_file->mutex_);
+ if (!blob_file->Immutable()) {
+ CloseBlobFile(blob_file, false /*need_lock*/);
+ }
+ // Need to double check if the file is obsolete.
+ if (!blob_file->Obsolete()) {
+ ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
+ }
+ }
+ }
+
+ return std::make_pair(true, -1);
+}
+
+Status BlobDBImpl::SyncBlobFiles() {
+ MutexLock l(&write_mutex_);
+
+ std::vector<std::shared_ptr<BlobFile>> process_files;
+ {
+ ReadLock rl(&mutex_);
+ for (auto fitr : open_ttl_files_) {
+ process_files.push_back(fitr);
+ }
+ if (open_non_ttl_file_ != nullptr) {
+ process_files.push_back(open_non_ttl_file_);
+ }
+ }
+
+ Status s;
+ for (auto& blob_file : process_files) {
+ s = blob_file->Fsync();
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to sync blob file %" PRIu64 ", status: %s",
+ blob_file->BlobFileNumber(), s.ToString().c_str());
+ return s;
+ }
+ }
+
+ s = dir_ent_->Fsync();
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to sync blob directory, status: %s",
+ s.ToString().c_str());
+ }
+ return s;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) {
+ if (aborted) return std::make_pair(false, -1);
+
+ if (open_file_count_.load() < kOpenFilesTrigger) {
+ return std::make_pair(true, -1);
+ }
+
+ // in the future, we should sort by last_access_
+ // instead of closing every file
+ ReadLock rl(&mutex_);
+ for (auto const& ent : blob_files_) {
+ auto bfile = ent.second;
+ if (bfile->last_access_.load() == -1) continue;
+
+ WriteLock lockbfile_w(&bfile->mutex_);
+ CloseRandomAccessLocked(bfile);
+ }
+
+ return std::make_pair(true, -1);
+}
+
+// Write callback for garbage collection to check if key has been updated
+// since last read. Similar to how OptimisticTransaction works. See inline
+// comment in GCFileAndUpdateLSM().
+class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback {
+ public:
+ GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key,
+ SequenceNumber upper_bound)
+ : cfd_(cfd), key_(key), upper_bound_(upper_bound) {}
+
+ Status Callback(DB* db) override {
+ auto* db_impl = reinterpret_cast<DBImpl*>(db);
+ auto* sv = db_impl->GetAndRefSuperVersion(cfd_);
+ SequenceNumber latest_seq = 0;
+ bool found_record_for_key = false;
+ bool is_blob_index = false;
+ Status s = db_impl->GetLatestSequenceForKey(
+ sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key,
+ &is_blob_index);
+ db_impl->ReturnAndCleanupSuperVersion(cfd_, sv);
+ if (!s.ok() && !s.IsNotFound()) {
+ // Error.
+ assert(!s.IsBusy());
+ return s;
+ }
+ if (s.IsNotFound()) {
+ assert(!found_record_for_key);
+ return Status::Busy("Key deleted");
+ }
+ assert(found_record_for_key);
+ assert(is_blob_index);
+ if (latest_seq > upper_bound_) {
+ return Status::Busy("Key overwritten");
+ }
+ return s;
+ }
+
+ bool AllowWriteBatching() override { return false; }
+
+ private:
+ ColumnFamilyData* cfd_;
+ // Key to check
+ Slice key_;
+ // Upper bound of sequence number to proceed.
+ SequenceNumber upper_bound_;
+};
+
+// iterate over the blobs sequentially and check if the blob sequence number
+// is the latest. If it is the latest, preserve it, otherwise delete it
+// if it is TTL based, and the TTL has expired, then
+// we can blow the entity if the key is still the latest or the Key is not
+// found
+// WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob
+// without doing anything if the earliest snapshot is not
+// referring to that sequence number, i.e. it is later than the sequence number
+// of the new key
+//
+// if it is not TTL based, then we can blow the key if the key has been
+// DELETED in the LSM
+Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
+ GCStats* gc_stats) {
+ StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS);
+ uint64_t now = EpochNow();
+
+ std::shared_ptr<Reader> reader =
+ bfptr->OpenRandomAccessReader(env_, db_options_, env_options_);
+ if (!reader) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "File sequential reader could not be opened for %s",
+ bfptr->PathName().c_str());
+ return Status::IOError("failed to create sequential reader");
+ }
+
+ BlobLogHeader header;
+ Status s = reader->ReadHeader(&header);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failure to read header for blob-file %s",
+ bfptr->PathName().c_str());
+ return s;
+ }
+
+ auto cfh = db_impl_->DefaultColumnFamily();
+ auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd();
+ auto column_family_id = cfd->GetID();
+ bool has_ttl = header.has_ttl;
+
+ // this reads the key but skips the blob
+ Reader::ReadLevel shallow = Reader::kReadHeaderKey;
+
+ bool file_expired = has_ttl && now >= bfptr->GetExpirationRange().second;
+
+ if (!file_expired) {
+ // read the blob because you have to write it back to new file
+ shallow = Reader::kReadHeaderKeyBlob;
+ }
+
+ BlobLogRecord record;
+ std::shared_ptr<BlobFile> newfile;
+ std::shared_ptr<Writer> new_writer;
+ uint64_t blob_offset = 0;
+
+ while (true) {
+ assert(s.ok());
+
+ // Read the next blob record.
+ Status read_record_status =
+ reader->ReadRecord(&record, shallow, &blob_offset);
+ // Exit if we reach the end of blob file.
+ // TODO(yiwu): properly handle ReadRecord error.
+ if (!read_record_status.ok()) {
+ break;
+ }
+ gc_stats->blob_count++;
+
+ // Similar to OptimisticTransaction, we obtain latest_seq from
+ // base DB, which is guaranteed to be no smaller than the sequence of
+ // current key. We use a WriteCallback on write to check the key sequence
+ // on write. If the key sequence is larger than latest_seq, we know
+ // a new versions is inserted and the old blob can be disgard.
+ //
+ // We cannot use OptimisticTransaction because we need to pass
+ // is_blob_index flag to GetImpl.
+ SequenceNumber latest_seq = GetLatestSequenceNumber();
+ bool is_blob_index = false;
+ PinnableSlice index_entry;
+ Status get_status = db_impl_->GetImpl(
+ ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/,
+ nullptr /*read_callback*/, &is_blob_index);
+ TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB");
+ if (!get_status.ok() && !get_status.IsNotFound()) {
+ // error
+ s = get_status;
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Error while getting index entry: %s",
+ s.ToString().c_str());
+ break;
+ }
+ if (get_status.IsNotFound() || !is_blob_index) {
+ // Either the key is deleted or updated with a newer version whish is
+ // inlined in LSM.
+ gc_stats->num_keys_overwritten++;
+ gc_stats->bytes_overwritten += record.record_size();
+ continue;
+ }
+
+ BlobIndex blob_index;
+ s = blob_index.DecodeFrom(index_entry);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Error while decoding index entry: %s",
+ s.ToString().c_str());
+ break;
+ }
+ if (blob_index.IsInlined() ||
+ blob_index.file_number() != bfptr->BlobFileNumber() ||
+ blob_index.offset() != blob_offset) {
+ // Key has been overwritten. Drop the blob record.
+ gc_stats->num_keys_overwritten++;
+ gc_stats->bytes_overwritten += record.record_size();
+ continue;
+ }
+
+ GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq);
+
+ // If key has expired, remove it from base DB.
+ // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter.
+ // We can just drop the blob record.
+ if (file_expired || (has_ttl && now >= record.expiration)) {
+ gc_stats->num_keys_expired++;
+ gc_stats->bytes_expired += record.record_size();
+ TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete");
+ WriteBatch delete_batch;
+ Status delete_status = delete_batch.Delete(record.key);
+ if (delete_status.ok()) {
+ delete_status = db_impl_->WriteWithCallback(WriteOptions(),
+ &delete_batch, &callback);
+ }
+ if (!delete_status.ok() && !delete_status.IsBusy()) {
+ // We hit an error.
+ s = delete_status;
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Error while deleting expired key: %s",
+ s.ToString().c_str());
+ break;
+ }
+ // Continue to next blob record or retry.
+ continue;
+ }
+
+ // Relocate the blob record to new file.
+ if (!newfile) {
+ // new file
+ std::string reason("GC of ");
+ reason += bfptr->PathName();
+ newfile = NewBlobFile(reason);
+
+ s = CheckOrCreateWriterLocked(newfile, &new_writer);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "Failed to open file %s for writer, error: %s",
+ newfile->PathName().c_str(), s.ToString().c_str());
+ break;
+ }
+ // Can't use header beyond this point
+ newfile->header_ = std::move(header);
+ newfile->header_valid_ = true;
+ newfile->file_size_ = BlobLogHeader::kSize;
+ newfile->SetColumnFamilyId(bfptr->column_family_id());
+ newfile->SetHasTTL(bfptr->HasTTL());
+ newfile->SetCompression(bfptr->compression());
+ newfile->expiration_range_ = bfptr->expiration_range_;
+
+ s = new_writer->WriteHeader(newfile->header_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "File: %s - header writing failed",
+ newfile->PathName().c_str());
+ break;
+ }
+
+ // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to
+ // avoid user writes writing to the file, and avoid
+ // EvictExpiredFiles close the file by mistake.
+ WriteLock wl(&mutex_);
+ blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile));
+ }
+
+ std::string new_index_entry;
+ uint64_t new_blob_offset = 0;
+ uint64_t new_key_offset = 0;
+ // write the blob to the blob log.
+ s = new_writer->AddRecord(record.key, record.value, record.expiration,
+ &new_key_offset, &new_blob_offset);
+
+ BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(),
+ new_blob_offset, record.value.size(),
+ bdb_options_.compression);
+
+ newfile->blob_count_++;
+ newfile->file_size_ +=
+ BlobLogRecord::kHeaderSize + record.key.size() + record.value.size();
+
+ TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate");
+ WriteBatch rewrite_batch;
+ Status rewrite_status = WriteBatchInternal::PutBlobIndex(
+ &rewrite_batch, column_family_id, record.key, new_index_entry);
+ if (rewrite_status.ok()) {
+ rewrite_status = db_impl_->WriteWithCallback(WriteOptions(),
+ &rewrite_batch, &callback);
+ }
+ if (rewrite_status.ok()) {
+ gc_stats->num_keys_relocated++;
+ gc_stats->bytes_relocated += record.record_size();
+ } else if (rewrite_status.IsBusy()) {
+ // The key is overwritten in the meanwhile. Drop the blob record.
+ gc_stats->num_keys_overwritten++;
+ gc_stats->bytes_overwritten += record.record_size();
+ } else {
+ // We hit an error.
+ s = rewrite_status;
+ ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s",
+ s.ToString().c_str());
+ break;
+ }
+ } // end of ReadRecord loop
+
+ {
+ WriteLock wl(&mutex_);
+ ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/);
+ }
+
+ ROCKS_LOG_INFO(
+ db_options_.info_log,
+ "%s blob file %" PRIu64 ". Total blob records: %" PRIu64
+ ", expired: %" PRIu64 " keys/%" PRIu64
+ " bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64
+ " bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.",
+ s.ok() ? "Successfully garbage collected" : "Failed to garbage collect",
+ bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired,
+ gc_stats->bytes_expired, gc_stats->num_keys_overwritten,
+ gc_stats->bytes_overwritten, gc_stats->num_keys_relocated,
+ gc_stats->bytes_relocated);
+ RecordTick(statistics_, BLOB_DB_GC_NUM_FILES);
+ RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN,
+ gc_stats->num_keys_overwritten);
+ RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED,
+ gc_stats->num_keys_expired);
+ RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN,
+ gc_stats->bytes_overwritten);
+ RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired);
+ if (newfile != nullptr) {
+ {
+ MutexLock l(&write_mutex_);
+ CloseBlobFile(newfile);
+ }
+ total_blob_size_ += newfile->file_size_;
+ ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".",
+ newfile->BlobFileNumber());
+ RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES);
+ RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
+ gc_stats->num_keys_relocated);
+ RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED,
+ gc_stats->bytes_relocated);
+ }
+ if (!s.ok()) {
+ RecordTick(statistics_, BLOB_DB_GC_FAILURES);
+ }
+ return s;
+}
+
+std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ MutexLock delete_file_lock(&delete_file_mutex_);
+ if (disable_file_deletions_ > 0) {
+ return std::make_pair(true, -1);
+ }
+
+ std::list<std::shared_ptr<BlobFile>> tobsolete;
+ {
+ WriteLock wl(&mutex_);
+ if (obsolete_files_.empty()) {
+ return std::make_pair(true, -1);
+ }
+ tobsolete.swap(obsolete_files_);
+ }
+
+ bool file_deleted = false;
+ for (auto iter = tobsolete.begin(); iter != tobsolete.end();) {
+ auto bfile = *iter;
+ {
+ ReadLock lockbfile_r(&bfile->mutex_);
+ if (VisibleToActiveSnapshot(bfile)) {
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Could not delete file due to snapshot failure %s",
+ bfile->PathName().c_str());
+ ++iter;
+ continue;
+ }
+ }
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Will delete file due to snapshot success %s",
+ bfile->PathName().c_str());
+
+ blob_files_.erase(bfile->BlobFileNumber());
+ Status s = DeleteDBFile(&(db_impl_->immutable_db_options()),
+ bfile->PathName(), blob_dir_, true);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log,
+ "File failed to be deleted as obsolete %s",
+ bfile->PathName().c_str());
+ ++iter;
+ continue;
+ }
+
+ file_deleted = true;
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "File deleted as obsolete from blob dir %s",
+ bfile->PathName().c_str());
+
+ iter = tobsolete.erase(iter);
+ }
+
+ // directory change. Fsync
+ if (file_deleted) {
+ Status s = dir_ent_->Fsync();
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s",
+ blob_dir_.c_str(), s.ToString().c_str());
+ }
+ }
+
+ // put files back into obsolete if for some reason, delete failed
+ if (!tobsolete.empty()) {
+ WriteLock wl(&mutex_);
+ for (auto bfile : tobsolete) {
+ obsolete_files_.push_front(bfile);
+ }
+ }
+
+ return std::make_pair(!aborted, -1);
+}
+
+void BlobDBImpl::CopyBlobFiles(
+ std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) {
+ ReadLock rl(&mutex_);
+ for (auto const& p : blob_files_) {
+ bfiles_copy->push_back(p.second);
+ }
+}
+
+std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) {
+ if (aborted) {
+ return std::make_pair(false, -1);
+ }
+
+ // TODO(yiwu): Garbage collection implementation.
+
+ // reschedule
+ return std::make_pair(true, -1);
+}
+
+Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) {
+ auto* cfd =
+ reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd();
+ // Get a snapshot to avoid blob file get deleted between we
+ // fetch and index entry and reading from the file.
+ ManagedSnapshot* own_snapshot = nullptr;
+ const Snapshot* snapshot = read_options.snapshot;
+ if (snapshot == nullptr) {
+ own_snapshot = new ManagedSnapshot(db_);
+ snapshot = own_snapshot->snapshot();
+ }
+ auto* iter = db_impl_->NewIteratorImpl(
+ read_options, cfd, snapshot->GetSequenceNumber(),
+ nullptr /*read_callback*/, true /*allow_blob*/);
+ return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_);
+}
+
+Status DestroyBlobDB(const std::string& dbname, const Options& options,
+ const BlobDBOptions& bdb_options) {
+ const ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
+ Env* env = soptions.env;
+
+ Status status;
+ std::string blobdir;
+ blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir
+ : bdb_options.blob_dir;
+
+ std::vector<std::string> filenames;
+ env->GetChildren(blobdir, &filenames);
+
+ for (const auto& f : filenames) {
+ uint64_t number;
+ FileType type;
+ if (ParseFileName(f, &number, &type) && type == kBlobFile) {
+ Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true);
+ if (status.ok() && !del.ok()) {
+ status = del;
+ }
+ }
+ }
+ env->DeleteDir(blobdir);
+
+ Status destroy = DestroyDB(dbname, options);
+ if (status.ok() && !destroy.ok()) {
+ status = destroy;
+ }
+
+ return status;
+}
+
+#ifndef NDEBUG
+Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value) {
+ return GetBlobValue(key, index_entry, value);
+}
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
+ ReadLock l(&mutex_);
+ std::vector<std::shared_ptr<BlobFile>> blob_files;
+ for (auto& p : blob_files_) {
+ blob_files.emplace_back(p.second);
+ }
+ return blob_files;
+}
+
+std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
+ const {
+ ReadLock l(&mutex_);
+ std::vector<std::shared_ptr<BlobFile>> obsolete_files;
+ for (auto& bfile : obsolete_files_) {
+ obsolete_files.emplace_back(bfile);
+ }
+ return obsolete_files;
+}
+
+void BlobDBImpl::TEST_DeleteObsoleteFiles() {
+ DeleteObsoleteFiles(false /*abort*/);
+}
+
+Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
+ MutexLock l(&write_mutex_);
+ return CloseBlobFile(bfile);
+}
+
+void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
+ SequenceNumber obsolete_seq,
+ bool update_size) {
+ return ObsoleteBlobFile(blob_file, obsolete_seq, update_size);
+}
+
+Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
+ GCStats* gc_stats) {
+ return GCFileAndUpdateLSM(bfile, gc_stats);
+}
+
+void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); }
+
+void BlobDBImpl::TEST_EvictExpiredFiles() {
+ EvictExpiredFiles(false /*abort*/);
+}
+
+uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); }
+#endif // !NDEBUG
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.h b/src/rocksdb/utilities/blob_db/blob_db_impl.h
new file mode 100644
index 00000000..0a22c0ac
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_impl.h
@@ -0,0 +1,425 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <condition_variable>
+#include <limits>
+#include <list>
+#include <memory>
+#include <set>
+#include <string>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "db/db_iter.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/db.h"
+#include "rocksdb/listener.h"
+#include "rocksdb/options.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/wal_filter.h"
+#include "util/mutexlock.h"
+#include "util/timer_queue.h"
+#include "utilities/blob_db/blob_db.h"
+#include "utilities/blob_db/blob_file.h"
+#include "utilities/blob_db/blob_log_format.h"
+#include "utilities/blob_db/blob_log_reader.h"
+#include "utilities/blob_db/blob_log_writer.h"
+
+namespace rocksdb {
+
+class DBImpl;
+class ColumnFamilyHandle;
+class ColumnFamilyData;
+struct FlushJobInfo;
+
+namespace blob_db {
+
+struct BlobCompactionContext;
+class BlobDBImpl;
+class BlobFile;
+
+// Comparator to sort "TTL" aware Blob files based on the lower value of
+// TTL range.
+struct BlobFileComparatorTTL {
+ bool operator()(const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const;
+};
+
+struct BlobFileComparator {
+ bool operator()(const std::shared_ptr<BlobFile>& lhs,
+ const std::shared_ptr<BlobFile>& rhs) const;
+};
+
+struct GCStats {
+ uint64_t blob_count = 0;
+ uint64_t num_keys_overwritten = 0;
+ uint64_t num_keys_expired = 0;
+ uint64_t num_keys_relocated = 0;
+ uint64_t bytes_overwritten = 0;
+ uint64_t bytes_expired = 0;
+ uint64_t bytes_relocated = 0;
+};
+
+/**
+ * The implementation class for BlobDB. This manages the value
+ * part in TTL aware sequentially written files. These files are
+ * Garbage Collected.
+ */
+class BlobDBImpl : public BlobDB {
+ friend class BlobFile;
+ friend class BlobDBIterator;
+
+ public:
+ // deletions check period
+ static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000;
+
+ // gc percentage each check period
+ static constexpr uint32_t kGCFilePercentage = 100;
+
+ // gc period
+ static constexpr uint32_t kGCCheckPeriodMillisecs = 60 * 1000;
+
+ // sanity check task
+ static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000;
+
+ // how many random access open files can we tolerate
+ static constexpr uint32_t kOpenFilesTrigger = 100;
+
+ // how often to schedule reclaim open files.
+ static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000;
+
+ // how often to schedule delete obs files periods
+ static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000;
+
+ // how often to schedule expired files eviction.
+ static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000;
+
+ // when should oldest file be evicted:
+ // on reaching 90% of blob_dir_size
+ static constexpr double kEvictOldestFileAtSize = 0.9;
+
+ using BlobDB::Put;
+ Status Put(const WriteOptions& options, const Slice& key,
+ const Slice& value) override;
+
+ using BlobDB::Get;
+ Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value) override;
+
+ Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value,
+ uint64_t* expiration) override;
+
+ using BlobDB::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions& read_options) override;
+
+ using BlobDB::NewIterators;
+ virtual Status NewIterators(
+ const ReadOptions& /*read_options*/,
+ const std::vector<ColumnFamilyHandle*>& /*column_families*/,
+ std::vector<Iterator*>* /*iterators*/) override {
+ return Status::NotSupported("Not implemented");
+ }
+
+ using BlobDB::MultiGet;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& read_options,
+ const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override;
+
+ virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+
+ virtual Status Close() override;
+
+ using BlobDB::PutWithTTL;
+ Status PutWithTTL(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t ttl) override;
+
+ using BlobDB::PutUntil;
+ Status PutUntil(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration) override;
+
+ BlobDBOptions GetBlobDBOptions() const override;
+
+ BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options,
+ const DBOptions& db_options,
+ const ColumnFamilyOptions& cf_options);
+
+ virtual Status DisableFileDeletions() override;
+
+ virtual Status EnableFileDeletions(bool force) override;
+
+ virtual Status GetLiveFiles(std::vector<std::string>&,
+ uint64_t* manifest_file_size,
+ bool flush_memtable = true) override;
+ virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override;
+
+ ~BlobDBImpl();
+
+ Status Open(std::vector<ColumnFamilyHandle*>* handles);
+
+ Status SyncBlobFiles() override;
+
+ void UpdateLiveSSTSize();
+
+ void GetCompactionContext(BlobCompactionContext* context);
+
+#ifndef NDEBUG
+ Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value);
+
+ std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
+
+ std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const;
+
+ Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
+
+ void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file,
+ SequenceNumber obsolete_seq = 0,
+ bool update_size = true);
+
+ Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
+ GCStats* gc_stats);
+
+ void TEST_RunGC();
+
+ void TEST_EvictExpiredFiles();
+
+ void TEST_DeleteObsoleteFiles();
+
+ uint64_t TEST_live_sst_size();
+
+ const std::string& TEST_blob_dir() const { return blob_dir_; }
+#endif // !NDEBUG
+
+ private:
+ class GarbageCollectionWriteCallback;
+ class BlobInserter;
+
+ // Create a snapshot if there isn't one in read options.
+ // Return true if a snapshot is created.
+ bool SetSnapshotIfNeeded(ReadOptions* read_options);
+
+ Status GetImpl(const ReadOptions& read_options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value, uint64_t* expiration = nullptr);
+
+ Status GetBlobValue(const Slice& key, const Slice& index_entry,
+ PinnableSlice* value, uint64_t* expiration = nullptr);
+
+ Slice GetCompressedSlice(const Slice& raw,
+ std::string* compression_output) const;
+
+ // Close a file by appending a footer, and removes file from open files list.
+ Status CloseBlobFile(std::shared_ptr<BlobFile> bfile, bool need_lock = true);
+
+ // Close a file if its size exceeds blob_file_size
+ Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile);
+
+ // Mark file as obsolete and move the file to obsolete file list.
+ //
+ // REQUIRED: hold write lock of mutex_ or during DB open.
+ void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
+ SequenceNumber obsolete_seq, bool update_size);
+
+ Status PutBlobValue(const WriteOptions& options, const Slice& key,
+ const Slice& value, uint64_t expiration,
+ WriteBatch* batch);
+
+ Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
+ const std::string& headerbuf, const Slice& key,
+ const Slice& value, uint64_t expiration,
+ std::string* index_entry);
+
+ // find an existing blob log file based on the expiration unix epoch
+ // if such a file does not exist, return nullptr
+ Status SelectBlobFileTTL(uint64_t expiration,
+ std::shared_ptr<BlobFile>* blob_file);
+
+ // find an existing blob log file to append the value to
+ Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file);
+
+ std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const;
+
+ // periodic sanity check. Bunch of checks
+ std::pair<bool, int64_t> SanityCheck(bool aborted);
+
+ // delete files which have been garbage collected and marked
+ // obsolete. Check whether any snapshots exist which refer to
+ // the same
+ std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted);
+
+ // Major task to garbage collect expired and deleted blobs
+ std::pair<bool, int64_t> RunGC(bool aborted);
+
+ // periodically check if open blob files and their TTL's has expired
+ // if expired, close the sequential writer and make the file immutable
+ std::pair<bool, int64_t> EvictExpiredFiles(bool aborted);
+
+ // if the number of open files, approaches ULIMIT's this
+ // task will close random readers, which are kept around for
+ // efficiency
+ std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted);
+
+ std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted);
+
+ // Adds the background tasks to the timer queue
+ void StartBackgroundTasks();
+
+ // add a new Blob File
+ std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason);
+
+ // collect all the blob log files from the blob directory
+ Status GetAllBlobFiles(std::set<uint64_t>* file_numbers);
+
+ // Open all blob files found in blob_dir.
+ Status OpenAllBlobFiles();
+
+ Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<RandomAccessFileReader>* reader);
+
+ // hold write mutex on file and call.
+ // Close the above Random Access reader
+ void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile);
+
+ // hold write mutex on file and call
+ // creates a sequential (append) writer for this blobfile
+ Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile);
+
+ // returns a Writer object for the file. If writer is not
+ // already present, creates one. Needs Write Mutex to be held
+ Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file,
+ std::shared_ptr<Writer>* writer);
+
+ // Iterate through keys and values on Blob and write into
+ // separate file the remaining blobs and delete/update pointers
+ // in LSM atomically
+ Status GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
+ GCStats* gcstats);
+
+ // checks if there is no snapshot which is referencing the
+ // blobs
+ bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file);
+ bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile);
+
+ void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy);
+
+ uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
+
+ // Check if inserting a new blob will make DB grow out of space.
+ // If is_fifo = true, FIFO eviction will be triggered to make room for the
+ // new blob. If force_evict = true, FIFO eviction will evict blob files
+ // even eviction will not make enough room for the new blob.
+ Status CheckSizeAndEvictBlobFiles(uint64_t blob_size,
+ bool force_evict = false);
+
+ // name of the database directory
+ std::string dbname_;
+
+ // the base DB
+ DBImpl* db_impl_;
+ Env* env_;
+
+ // the options that govern the behavior of Blob Storage
+ BlobDBOptions bdb_options_;
+ DBOptions db_options_;
+ ColumnFamilyOptions cf_options_;
+ EnvOptions env_options_;
+
+ // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold
+ // ownership.
+ Statistics* statistics_;
+
+ // by default this is "blob_dir" under dbname_
+ // but can be configured
+ std::string blob_dir_;
+
+ // pointer to directory
+ std::unique_ptr<Directory> dir_ent_;
+
+ // Read Write Mutex, which protects all the data structures
+ // HEAVILY TRAFFICKED
+ mutable port::RWMutex mutex_;
+
+ // Writers has to hold write_mutex_ before writing.
+ mutable port::Mutex write_mutex_;
+
+ // counter for blob file number
+ std::atomic<uint64_t> next_file_number_;
+
+ // entire metadata of all the BLOB files memory
+ std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_;
+
+ // epoch or version of the open files.
+ std::atomic<uint64_t> epoch_of_;
+
+ // opened non-TTL blob file.
+ std::shared_ptr<BlobFile> open_non_ttl_file_;
+
+ // all the blob files which are currently being appended to based
+ // on variety of incoming TTL's
+ std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_;
+
+ // Flag to check whether Close() has been called on this DB
+ bool closed_;
+
+ // timer based queue to execute tasks
+ TimerQueue tqueue_;
+
+ // number of files opened for random access/GET
+ // counter is used to monitor and close excess RA files.
+ std::atomic<uint32_t> open_file_count_;
+
+ // Total size of all live blob files (i.e. exclude obsolete files).
+ std::atomic<uint64_t> total_blob_size_;
+
+ // total size of SST files.
+ std::atomic<uint64_t> live_sst_size_;
+
+ // Latest FIFO eviction timestamp
+ //
+ // REQUIRES: access with metex_ lock held.
+ uint64_t fifo_eviction_seq_;
+
+ // The expiration up to which latest FIFO eviction evicts.
+ //
+ // REQUIRES: access with metex_ lock held.
+ uint64_t evict_expiration_up_to_;
+
+ std::list<std::shared_ptr<BlobFile>> obsolete_files_;
+
+ // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block
+ // on the mutex to avoid contention.
+ //
+ // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note
+ // the difference. mutex_ only needs to be held when access the
+ // data-structure, and delete_file_mutex_ needs to be held the whole time
+ // during DeleteObsoleteFiles to avoid being run simultaneously with
+ // DisableFileDeletions.
+ //
+ // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced
+ // to hold delete_file_mutex_ first to avoid deadlock.
+ mutable port::Mutex delete_file_mutex_;
+
+ // Each call of DisableFileDeletions will increase disable_file_deletion_
+ // by 1. EnableFileDeletions will either decrease the count by 1 or reset
+ // it to zeor, depending on the force flag.
+ //
+ // REQUIRES: access with delete_file_mutex_ held.
+ int disable_file_deletions_ = 0;
+
+ uint32_t debug_level_;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc
new file mode 100644
index 00000000..8effe88c
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc
@@ -0,0 +1,108 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_db_impl.h"
+
+#include "util/filename.h"
+#include "util/logging.h"
+#include "util/mutexlock.h"
+
+// BlobDBImpl methods to get snapshot of files, e.g. for replication.
+
+namespace rocksdb {
+namespace blob_db {
+
+Status BlobDBImpl::DisableFileDeletions() {
+ // Disable base DB file deletions.
+ Status s = db_impl_->DisableFileDeletions();
+ if (!s.ok()) {
+ return s;
+ }
+
+ int count = 0;
+ {
+ // Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job
+ // is running.
+ MutexLock l(&delete_file_mutex_);
+ count = ++disable_file_deletions_;
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Disalbed blob file deletions. count: %d", count);
+ return Status::OK();
+}
+
+Status BlobDBImpl::EnableFileDeletions(bool force) {
+ // Enable base DB file deletions.
+ Status s = db_impl_->EnableFileDeletions(force);
+ if (!s.ok()) {
+ return s;
+ }
+
+ int count = 0;
+ {
+ MutexLock l(&delete_file_mutex_);
+ if (force) {
+ disable_file_deletions_ = 0;
+ } else if (disable_file_deletions_ > 0) {
+ count = --disable_file_deletions_;
+ }
+ assert(count >= 0);
+ }
+
+ ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d",
+ count);
+ // Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to
+ // make DeleteobsoleteFiles re-run interval configuration.
+ return Status::OK();
+}
+
+Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret,
+ uint64_t* manifest_file_size,
+ bool flush_memtable) {
+ if (!bdb_options_.path_relative) {
+ return Status::NotSupported(
+ "Not able to get relative blob file path from absolute blob_dir.");
+ }
+ // Hold a lock in the beginning to avoid updates to base DB during the call
+ ReadLock rl(&mutex_);
+ Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable);
+ if (!s.ok()) {
+ return s;
+ }
+ ret.reserve(ret.size() + blob_files_.size());
+ for (auto bfile_pair : blob_files_) {
+ auto blob_file = bfile_pair.second;
+ // Path should be relative to db_name, but begin with slash.
+ ret.emplace_back(
+ BlobFileName("", bdb_options_.blob_dir, blob_file->BlobFileNumber()));
+ }
+ return Status::OK();
+}
+
+void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
+ // Path should be relative to db_name.
+ assert(bdb_options_.path_relative);
+ // Hold a lock in the beginning to avoid updates to base DB during the call
+ ReadLock rl(&mutex_);
+ db_->GetLiveFilesMetaData(metadata);
+ for (auto bfile_pair : blob_files_) {
+ auto blob_file = bfile_pair.second;
+ LiveFileMetaData filemetadata;
+ filemetadata.size = static_cast<size_t>(blob_file->GetFileSize());
+ // Path should be relative to db_name, but begin with slash.
+ filemetadata.name =
+ BlobFileName("", bdb_options_.blob_dir, blob_file->BlobFileNumber());
+ auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
+ filemetadata.column_family_name = cfh->GetName();
+ metadata->emplace_back(filemetadata);
+ }
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_iterator.h b/src/rocksdb/utilities/blob_db/blob_db_iterator.h
new file mode 100644
index 00000000..1565c670
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_iterator.h
@@ -0,0 +1,148 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include "monitoring/statistics.h"
+#include "rocksdb/iterator.h"
+#include "util/stop_watch.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+using rocksdb::ManagedSnapshot;
+
+class BlobDBIterator : public Iterator {
+ public:
+ BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter,
+ BlobDBImpl* blob_db, Env* env, Statistics* statistics)
+ : snapshot_(snapshot),
+ iter_(iter),
+ blob_db_(blob_db),
+ env_(env),
+ statistics_(statistics) {}
+
+ virtual ~BlobDBIterator() = default;
+
+ bool Valid() const override {
+ if (!iter_->Valid()) {
+ return false;
+ }
+ return status_.ok();
+ }
+
+ Status status() const override {
+ if (!iter_->status().ok()) {
+ return iter_->status();
+ }
+ return status_;
+ }
+
+ void SeekToFirst() override {
+ StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->SeekToFirst();
+ while (UpdateBlobValue()) {
+ iter_->Next();
+ }
+ }
+
+ void SeekToLast() override {
+ StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->SeekToLast();
+ while (UpdateBlobValue()) {
+ iter_->Prev();
+ }
+ }
+
+ void Seek(const Slice& target) override {
+ StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->Seek(target);
+ while (UpdateBlobValue()) {
+ iter_->Next();
+ }
+ }
+
+ void SeekForPrev(const Slice& target) override {
+ StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_SEEK);
+ iter_->SeekForPrev(target);
+ while (UpdateBlobValue()) {
+ iter_->Prev();
+ }
+ }
+
+ void Next() override {
+ assert(Valid());
+ StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_NEXT);
+ iter_->Next();
+ while (UpdateBlobValue()) {
+ iter_->Next();
+ }
+ }
+
+ void Prev() override {
+ assert(Valid());
+ StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS);
+ RecordTick(statistics_, BLOB_DB_NUM_PREV);
+ iter_->Prev();
+ while (UpdateBlobValue()) {
+ iter_->Prev();
+ }
+ }
+
+ Slice key() const override {
+ assert(Valid());
+ return iter_->key();
+ }
+
+ Slice value() const override {
+ assert(Valid());
+ if (!iter_->IsBlob()) {
+ return iter_->value();
+ }
+ return value_;
+ }
+
+ // Iterator::Refresh() not supported.
+
+ private:
+ // Return true if caller should continue to next value.
+ bool UpdateBlobValue() {
+ TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1");
+ TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2");
+ value_.Reset();
+ status_ = Status::OK();
+ if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) {
+ Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_);
+ if (s.IsNotFound()) {
+ return true;
+ } else {
+ if (!s.ok()) {
+ status_ = s;
+ }
+ return false;
+ }
+ } else {
+ return false;
+ }
+ }
+
+ std::unique_ptr<ManagedSnapshot> snapshot_;
+ std::unique_ptr<ArenaWrappedDBIter> iter_;
+ BlobDBImpl* blob_db_;
+ Env* env_;
+ Statistics* statistics_;
+ Status status_;
+ PinnableSlice value_;
+};
+} // namespace blob_db
+} // namespace rocksdb
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_listener.h b/src/rocksdb/utilities/blob_db/blob_db_listener.h
new file mode 100644
index 00000000..f096d238
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_listener.h
@@ -0,0 +1,46 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+
+#include "rocksdb/listener.h"
+#include "util/mutexlock.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+class BlobDBListener : public EventListener {
+ public:
+ explicit BlobDBListener(BlobDBImpl* blob_db_impl)
+ : blob_db_impl_(blob_db_impl) {}
+
+ void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override {
+ assert(blob_db_impl_ != nullptr);
+ blob_db_impl_->SyncBlobFiles();
+ }
+
+ void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override {
+ assert(blob_db_impl_ != nullptr);
+ blob_db_impl_->UpdateLiveSSTSize();
+ }
+
+ void OnCompactionCompleted(DB* /*db*/,
+ const CompactionJobInfo& /*info*/) override {
+ assert(blob_db_impl_ != nullptr);
+ blob_db_impl_->UpdateLiveSSTSize();
+ }
+
+ private:
+ BlobDBImpl* blob_db_impl_;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_db_test.cc b/src/rocksdb/utilities/blob_db/blob_db_test.cc
new file mode 100644
index 00000000..afb953df
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_db_test.cc
@@ -0,0 +1,1695 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include <algorithm>
+#include <chrono>
+#include <cstdlib>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "db/db_test_util.h"
+#include "port/port.h"
+#include "rocksdb/utilities/debug.h"
+#include "util/cast_util.h"
+#include "util/fault_injection_test_env.h"
+#include "util/file_util.h"
+#include "util/random.h"
+#include "util/sst_file_manager_impl.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+#include "util/testharness.h"
+#include "utilities/blob_db/blob_db.h"
+#include "utilities/blob_db/blob_db_impl.h"
+#include "utilities/blob_db/blob_index.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+class BlobDBTest : public testing::Test {
+ public:
+ const int kMaxBlobSize = 1 << 14;
+
+ struct BlobRecord {
+ std::string key;
+ std::string value;
+ uint64_t expiration = 0;
+ };
+
+ BlobDBTest()
+ : dbname_(test::PerThreadDBPath("blob_db_test")),
+ mock_env_(new MockTimeEnv(Env::Default())),
+ fault_injection_env_(new FaultInjectionTestEnv(Env::Default())),
+ blob_db_(nullptr) {
+ Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
+ assert(s.ok());
+ }
+
+ ~BlobDBTest() override {
+ SyncPoint::GetInstance()->ClearAllCallBacks();
+ Destroy();
+ }
+
+ Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
+ Options options = Options()) {
+ options.create_if_missing = true;
+ return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
+ }
+
+ void Open(BlobDBOptions bdb_options = BlobDBOptions(),
+ Options options = Options()) {
+ ASSERT_OK(TryOpen(bdb_options, options));
+ }
+
+ void Reopen(BlobDBOptions bdb_options = BlobDBOptions(),
+ Options options = Options()) {
+ assert(blob_db_ != nullptr);
+ delete blob_db_;
+ blob_db_ = nullptr;
+ Open(bdb_options, options);
+ }
+
+ void Close() {
+ assert(blob_db_ != nullptr);
+ delete blob_db_;
+ blob_db_ = nullptr;
+ }
+
+ void Destroy() {
+ if (blob_db_) {
+ Options options = blob_db_->GetOptions();
+ BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions();
+ delete blob_db_;
+ blob_db_ = nullptr;
+ ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options));
+ }
+ }
+
+ BlobDBImpl *blob_db_impl() {
+ return reinterpret_cast<BlobDBImpl *>(blob_db_);
+ }
+
+ Status Put(const Slice &key, const Slice &value,
+ std::map<std::string, std::string> *data = nullptr) {
+ Status s = blob_db_->Put(WriteOptions(), key, value);
+ if (data != nullptr) {
+ (*data)[key.ToString()] = value.ToString();
+ }
+ return s;
+ }
+
+ void Delete(const std::string &key,
+ std::map<std::string, std::string> *data = nullptr) {
+ ASSERT_OK(blob_db_->Delete(WriteOptions(), key));
+ if (data != nullptr) {
+ data->erase(key);
+ }
+ }
+
+ Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl,
+ std::map<std::string, std::string> *data = nullptr) {
+ Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl);
+ if (data != nullptr) {
+ (*data)[key.ToString()] = value.ToString();
+ }
+ return s;
+ }
+
+ Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) {
+ return blob_db_->PutUntil(WriteOptions(), key, value, expiration);
+ }
+
+ void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = test::RandomHumanReadableString(rnd, len);
+ ASSERT_OK(
+ blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = test::RandomHumanReadableString(rnd, len);
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
+ expiration));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ void PutRandom(const std::string &key, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ PutRandom(blob_db_, key, rnd, data);
+ }
+
+ void PutRandom(DB *db, const std::string &key, Random *rnd,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = test::RandomHumanReadableString(rnd, len);
+ ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value)));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ void PutRandomToWriteBatch(
+ const std::string &key, Random *rnd, WriteBatch *batch,
+ std::map<std::string, std::string> *data = nullptr) {
+ int len = rnd->Next() % kMaxBlobSize + 1;
+ std::string value = test::RandomHumanReadableString(rnd, len);
+ ASSERT_OK(batch->Put(key, value));
+ if (data != nullptr) {
+ (*data)[key] = value;
+ }
+ }
+
+ // Verify blob db contain expected data and nothing more.
+ void VerifyDB(const std::map<std::string, std::string> &data) {
+ VerifyDB(blob_db_, data);
+ }
+
+ void VerifyDB(DB *db, const std::map<std::string, std::string> &data) {
+ // Verify normal Get
+ auto* cfh = db->DefaultColumnFamily();
+ for (auto &p : data) {
+ PinnableSlice value_slice;
+ ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice));
+ ASSERT_EQ(p.second, value_slice.ToString());
+ std::string value;
+ ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value));
+ ASSERT_EQ(p.second, value);
+ }
+
+ // Verify iterators
+ Iterator *iter = db->NewIterator(ReadOptions());
+ iter->SeekToFirst();
+ for (auto &p : data) {
+ ASSERT_TRUE(iter->Valid());
+ ASSERT_EQ(p.first, iter->key().ToString());
+ ASSERT_EQ(p.second, iter->value().ToString());
+ iter->Next();
+ }
+ ASSERT_FALSE(iter->Valid());
+ ASSERT_OK(iter->status());
+ delete iter;
+ }
+
+ void VerifyBaseDB(
+ const std::map<std::string, KeyVersion> &expected_versions) {
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ DB *db = blob_db_->GetRootDB();
+ const size_t kMaxKeys = 10000;
+ std::vector<KeyVersion> versions;
+ GetAllKeyVersions(db, "", "", kMaxKeys, &versions);
+ ASSERT_EQ(expected_versions.size(), versions.size());
+ size_t i = 0;
+ for (auto &key_version : expected_versions) {
+ const KeyVersion &expected_version = key_version.second;
+ ASSERT_EQ(expected_version.user_key, versions[i].user_key);
+ ASSERT_EQ(expected_version.sequence, versions[i].sequence);
+ ASSERT_EQ(expected_version.type, versions[i].type);
+ if (versions[i].type == kTypeValue) {
+ ASSERT_EQ(expected_version.value, versions[i].value);
+ } else {
+ ASSERT_EQ(kTypeBlobIndex, versions[i].type);
+ PinnableSlice value;
+ ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key,
+ versions[i].value, &value));
+ ASSERT_EQ(expected_version.value, value.ToString());
+ }
+ i++;
+ }
+ }
+
+ void InsertBlobs() {
+ WriteOptions wo;
+ std::string value;
+
+ Random rnd(301);
+ for (size_t i = 0; i < 100000; i++) {
+ uint64_t ttl = rnd.Next() % 86400;
+ PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr);
+ }
+
+ for (size_t i = 0; i < 10; i++) {
+ Delete("key" + ToString(i % 500));
+ }
+ }
+
+ const std::string dbname_;
+ std::unique_ptr<MockTimeEnv> mock_env_;
+ std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_;
+ BlobDB *blob_db_;
+}; // class BlobDBTest
+
+TEST_F(BlobDBTest, Put) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, PutWithTTL) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.min_blob_size = 0;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_env_->set_current_time(50);
+ for (size_t i = 0; i < 100; i++) {
+ uint64_t ttl = rnd.Next() % 100;
+ PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
+ (ttl <= 50 ? nullptr : &data));
+ }
+ mock_env_->set_current_time(100);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
+ ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, PutUntil) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 1000;
+ bdb_options.min_blob_size = 0;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ mock_env_->set_current_time(50);
+ for (size_t i = 0; i < 100; i++) {
+ uint64_t expiration = rnd.Next() % 100 + 50;
+ PutRandomUntil("key" + ToString(i), expiration, &rnd,
+ (expiration <= 100 ? nullptr : &data));
+ }
+ mock_env_->set_current_time(100);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
+ GCStats gc_stats;
+ ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
+ ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, StackableDBGet) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ for (size_t i = 0; i < 100; i++) {
+ StackableDB *db = blob_db_;
+ ColumnFamilyHandle *column_family = db->DefaultColumnFamily();
+ std::string key = "key" + ToString(i);
+ PinnableSlice pinnable_value;
+ ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value));
+ std::string string_value;
+ ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value));
+ ASSERT_EQ(string_value, pinnable_value.ToString());
+ ASSERT_EQ(string_value, data[key]);
+ }
+}
+
+TEST_F(BlobDBTest, GetExpiration) {
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.disable_background_tasks = true;
+ mock_env_->set_current_time(100);
+ Open(bdb_options, options);
+ Put("key1", "value1");
+ PutWithTTL("key2", "value2", 200);
+ PinnableSlice value;
+ uint64_t expiration;
+ ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration));
+ ASSERT_EQ("value1", value.ToString());
+ ASSERT_EQ(kNoExpiration, expiration);
+ ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration));
+ ASSERT_EQ("value2", value.ToString());
+ ASSERT_EQ(300 /* = 100 + 200 */, expiration);
+}
+
+TEST_F(BlobDBTest, GetIOError) {
+ Options options;
+ options.env = fault_injection_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0; // Make sure value write to blob file
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily();
+ PinnableSlice value;
+ ASSERT_OK(Put("foo", "bar"));
+ fault_injection_env_->SetFilesystemActive(false, Status::IOError());
+ Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value);
+ ASSERT_TRUE(s.IsIOError());
+ // Reactivate file system to allow test to close DB.
+ fault_injection_env_->SetFilesystemActive(true);
+}
+
+TEST_F(BlobDBTest, PutIOError) {
+ Options options;
+ options.env = fault_injection_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0; // Make sure value write to blob file
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ fault_injection_env_->SetFilesystemActive(false, Status::IOError());
+ ASSERT_TRUE(Put("foo", "v1").IsIOError());
+ fault_injection_env_->SetFilesystemActive(true, Status::IOError());
+ ASSERT_OK(Put("bar", "v1"));
+}
+
+TEST_F(BlobDBTest, WriteBatch) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < 10; j++) {
+ PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data);
+ }
+ blob_db_->Write(WriteOptions(), &batch);
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, Delete) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ for (size_t i = 0; i < 100; i += 5) {
+ Delete("key" + ToString(i), &data);
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, DeleteBatch) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd);
+ }
+ WriteBatch batch;
+ for (size_t i = 0; i < 100; i++) {
+ batch.Delete("key" + ToString(i));
+ }
+ ASSERT_OK(blob_db_->Write(WriteOptions(), &batch));
+ // DB should be empty.
+ VerifyDB({});
+}
+
+TEST_F(BlobDBTest, Override) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (int i = 0; i < 10000; i++) {
+ PutRandom("key" + ToString(i), &rnd, nullptr);
+ }
+ // override all the keys
+ for (int i = 0; i < 10000; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ VerifyDB(data);
+}
+
+#ifdef SNAPPY
+TEST_F(BlobDBTest, Compression) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = CompressionType::kSnappyCompression;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("put-key" + ToString(i), &rnd, &data);
+ }
+ for (int i = 0; i < 100; i++) {
+ WriteBatch batch;
+ for (size_t j = 0; j < 10; j++) {
+ PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd,
+ &batch, &data);
+ }
+ blob_db_->Write(WriteOptions(), &batch);
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, DecompressAfterReopen) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ bdb_options.compression = CompressionType::kSnappyCompression;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("put-key" + ToString(i), &rnd, &data);
+ }
+ VerifyDB(data);
+ bdb_options.compression = CompressionType::kNoCompression;
+ Reopen(bdb_options);
+ VerifyDB(data);
+}
+#endif
+
+TEST_F(BlobDBTest, MultipleWriters) {
+ Open(BlobDBOptions());
+
+ std::vector<port::Thread> workers;
+ std::vector<std::map<std::string, std::string>> data_set(10);
+ for (uint32_t i = 0; i < 10; i++)
+ workers.push_back(port::Thread(
+ [&](uint32_t id) {
+ Random rnd(301 + id);
+ for (int j = 0; j < 100; j++) {
+ std::string key = "key" + ToString(id) + "_" + ToString(j);
+ if (id < 5) {
+ PutRandom(key, &rnd, &data_set[id]);
+ } else {
+ WriteBatch batch;
+ PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]);
+ blob_db_->Write(WriteOptions(), &batch);
+ }
+ }
+ },
+ i));
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 10; i++) {
+ workers[i].join();
+ data.insert(data_set[i].begin(), data_set[i].end());
+ }
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, GCAfterOverwriteKeys) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ DBImpl *db_impl = static_cast_with_check<DBImpl, DB>(blob_db_->GetBaseDB());
+ std::map<std::string, std::string> data;
+ for (int i = 0; i < 200; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+ // Test for data in SST
+ size_t new_keys = 0;
+ for (int i = 0; i < 100; i++) {
+ if (rnd.Next() % 2 == 1) {
+ new_keys++;
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ }
+ db_impl->TEST_FlushMemTable(true /*wait*/);
+ // Test for data in memtable
+ for (int i = 100; i < 200; i++) {
+ if (rnd.Next() % 2 == 1) {
+ new_keys++;
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ }
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(200, gc_stats.blob_count);
+ ASSERT_EQ(0, gc_stats.num_keys_expired);
+ ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1"));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
+ "BlobDBImpl::PutUntil:Start"},
+ {"BlobDBImpl::PutUntil:Finish",
+ "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ auto writer = port::Thread(
+ [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); });
+
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(1, gc_stats.blob_count);
+ ASSERT_EQ(0, gc_stats.num_keys_expired);
+ ASSERT_EQ(1, gc_stats.num_keys_overwritten);
+ ASSERT_EQ(0, gc_stats.num_keys_relocated);
+ writer.join();
+ VerifyDB({{"foo", "v2"}});
+}
+
+TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
+ Random rnd(301);
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options, options);
+ mock_env_->set_current_time(100);
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+ mock_env_->set_current_time(300);
+
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
+ "BlobDBImpl::PutUntil:Start"},
+ {"BlobDBImpl::PutUntil:Finish",
+ "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}});
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ auto writer = port::Thread([this]() {
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400));
+ });
+
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
+ ASSERT_EQ(1, gc_stats.blob_count);
+ ASSERT_EQ(1, gc_stats.num_keys_expired);
+ ASSERT_EQ(0, gc_stats.num_keys_relocated);
+ writer.join();
+ VerifyDB({{"foo", "v2"}});
+}
+
+TEST_F(BlobDBTest, NewFileGeneratedFromGCShouldMarkAsImmutable) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ ASSERT_OK(Put("foo", "bar"));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ auto blob_file1 = blob_files[0];
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file1));
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_file1, &gc_stats));
+ ASSERT_EQ(1, gc_stats.blob_count);
+ ASSERT_EQ(1, gc_stats.num_keys_relocated);
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ ASSERT_EQ(blob_file1, blob_files[0]);
+ ASSERT_TRUE(blob_files[1]->Immutable());
+}
+
+// This test is no longer valid since we now return an error when we go
+// over the configured max_db_size.
+// The test needs to be re-written later in such a way that writes continue
+// after a GC happens.
+TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) {
+ // Use mock env to stop wall clock.
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 100;
+ bdb_options.blob_file_size = 100;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::string value(100, 'v');
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key_with_ttl", value, 60));
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value));
+ }
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(11, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->HasTTL());
+ ASSERT_TRUE(blob_files[0]->Immutable());
+ for (int i = 1; i <= 10; i++) {
+ ASSERT_FALSE(blob_files[i]->HasTTL());
+ if (i < 10) {
+ ASSERT_TRUE(blob_files[i]->Immutable());
+ }
+ }
+ blob_db_impl()->TEST_RunGC();
+ // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC.
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(1, obsolete_files.size());
+ ASSERT_EQ(blob_files[1]->BlobFileNumber(),
+ obsolete_files[0]->BlobFileNumber());
+}
+
+TEST_F(BlobDBTest, ReadWhileGC) {
+ // run the same test for Get(), MultiGet() and Iterator each.
+ for (int i = 0; i < 2; i++) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ blob_db_->Put(WriteOptions(), "foo", "bar");
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ std::shared_ptr<BlobFile> bfile = blob_files[0];
+ uint64_t bfile_number = bfile->BlobFileNumber();
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
+
+ switch (i) {
+ case 0:
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"BlobDBImpl::Get:AfterIndexEntryGet:1",
+ "BlobDBTest::ReadWhileGC:1"},
+ {"BlobDBTest::ReadWhileGC:2",
+ "BlobDBImpl::Get:AfterIndexEntryGet:2"}});
+ break;
+ case 1:
+ SyncPoint::GetInstance()->LoadDependency(
+ {{"BlobDBIterator::UpdateBlobValue:Start:1",
+ "BlobDBTest::ReadWhileGC:1"},
+ {"BlobDBTest::ReadWhileGC:2",
+ "BlobDBIterator::UpdateBlobValue:Start:2"}});
+ break;
+ }
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ auto reader = port::Thread([this, i]() {
+ std::string value;
+ std::vector<std::string> values;
+ std::vector<Status> statuses;
+ switch (i) {
+ case 0:
+ ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value));
+ ASSERT_EQ("bar", value);
+ break;
+ case 1:
+ // VerifyDB use iterator to scan the DB.
+ VerifyDB({{"foo", "bar"}});
+ break;
+ }
+ });
+
+ TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1");
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
+ ASSERT_EQ(1, gc_stats.blob_count);
+ ASSERT_EQ(1, gc_stats.num_keys_relocated);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ // The file shouln't be deleted
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber());
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(1, obsolete_files.size());
+ ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber());
+ TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2");
+ reader.join();
+ SyncPoint::GetInstance()->DisableProcessing();
+
+ // The file is deleted this time
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber());
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB({{"foo", "bar"}});
+ Destroy();
+ }
+}
+
+TEST_F(BlobDBTest, SstFileManager) {
+ // run the same test for Get(), MultiGet() and Iterator each.
+ std::shared_ptr<SstFileManager> sst_file_manager(
+ NewSstFileManager(mock_env_.get()));
+ sst_file_manager->SetDeleteRateBytesPerSecond(1);
+ SstFileManagerImpl *sfm =
+ static_cast<SstFileManagerImpl *>(sst_file_manager.get());
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ Options db_options;
+
+ int files_deleted_directly = 0;
+ int files_scheduled_to_delete = 0;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion",
+ [&](void * /*arg*/) { files_scheduled_to_delete++; });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile",
+ [&](void * /*arg*/) { files_deleted_directly++; });
+ SyncPoint::GetInstance()->EnableProcessing();
+ db_options.sst_file_manager = sst_file_manager;
+
+ Open(bdb_options, db_options);
+
+ // Create one obselete file and clean it.
+ blob_db_->Put(WriteOptions(), "foo", "bar");
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ std::shared_ptr<BlobFile> bfile = blob_files[0];
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+
+ // Even if SSTFileManager is not set, DB is creating a dummy one.
+ ASSERT_EQ(1, files_scheduled_to_delete);
+ ASSERT_EQ(0, files_deleted_directly);
+ Destroy();
+ // Make sure that DestroyBlobDB() also goes through delete scheduler.
+ ASSERT_GE(files_scheduled_to_delete, 2);
+ // Due to a timing issue, the WAL may or may not be deleted directly. The
+ // blob file is first scheduled, followed by WAL. If the background trash
+ // thread does not wake up on time, the WAL file will be directly
+ // deleted as the trash size will be > DB size
+ ASSERT_LE(files_deleted_directly, 1);
+ SyncPoint::GetInstance()->DisableProcessing();
+ sfm->WaitForEmptyTrash();
+}
+
+TEST_F(BlobDBTest, SstFileManagerRestart) {
+ int files_deleted_directly = 0;
+ int files_scheduled_to_delete = 0;
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "SstFileManagerImpl::ScheduleFileDeletion",
+ [&](void * /*arg*/) { files_scheduled_to_delete++; });
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "DeleteScheduler::DeleteFile",
+ [&](void * /*arg*/) { files_deleted_directly++; });
+
+ // run the same test for Get(), MultiGet() and Iterator each.
+ std::shared_ptr<SstFileManager> sst_file_manager(
+ NewSstFileManager(mock_env_.get()));
+ sst_file_manager->SetDeleteRateBytesPerSecond(1);
+ SstFileManagerImpl *sfm =
+ static_cast<SstFileManagerImpl *>(sst_file_manager.get());
+
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ Options db_options;
+
+ SyncPoint::GetInstance()->EnableProcessing();
+ db_options.sst_file_manager = sst_file_manager;
+
+ Open(bdb_options, db_options);
+ std::string blob_dir = blob_db_impl()->TEST_blob_dir();
+ blob_db_->Put(WriteOptions(), "foo", "bar");
+ Close();
+
+ // Create 3 dummy trash files under the blob_dir
+ CreateFile(db_options.env, blob_dir + "/000666.blob.trash", "", false);
+ CreateFile(db_options.env, blob_dir + "/000888.blob.trash", "", true);
+ CreateFile(db_options.env, blob_dir + "/something_not_match.trash", "",
+ false);
+
+ // Make sure that reopening the DB rescan the existing trash files
+ Open(bdb_options, db_options);
+ ASSERT_GE(files_scheduled_to_delete, 3);
+ // Depending on timing, the WAL file may or may not be directly deleted
+ ASSERT_LE(files_deleted_directly, 1);
+
+ sfm->WaitForEmptyTrash();
+
+ // There should be exact one file under the blob dir now.
+ std::vector<std::string> all_files;
+ ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files));
+ int nfiles = 0;
+ for (const auto &f : all_files) {
+ assert(!f.empty());
+ if (f[0] == '.') {
+ continue;
+ }
+ nfiles++;
+ }
+ ASSERT_EQ(nfiles, 1);
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+TEST_F(BlobDBTest, SnapshotAndGarbageCollection) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ // i = when to take snapshot
+ for (int i = 0; i < 4; i++) {
+ for (bool delete_key : {true, false}) {
+ const Snapshot *snapshot = nullptr;
+ Destroy();
+ Open(bdb_options);
+ // First file
+ ASSERT_OK(Put("key1", "value"));
+ if (i == 0) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+ // Second file
+ ASSERT_OK(Put("key2", "value"));
+ if (i == 1) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ auto bfile = blob_files[1];
+ ASSERT_FALSE(bfile->Immutable());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile));
+ // Third file
+ ASSERT_OK(Put("key3", "value"));
+ if (i == 2) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+ if (delete_key) {
+ Delete("key2");
+ }
+ GCStats gc_stats;
+ ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats));
+ ASSERT_TRUE(bfile->Obsolete());
+ ASSERT_EQ(1, gc_stats.blob_count);
+ if (delete_key) {
+ ASSERT_EQ(0, gc_stats.num_keys_relocated);
+ } else {
+ ASSERT_EQ(1, gc_stats.num_keys_relocated);
+ }
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(),
+ bfile->GetObsoleteSequence());
+ if (i == 3) {
+ snapshot = blob_db_->GetSnapshot();
+ }
+ size_t num_files = delete_key ? 3 : 4;
+ ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ if (i == 3) {
+ // The snapshot shouldn't see data in bfile
+ ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_->ReleaseSnapshot(snapshot);
+ } else {
+ // The snapshot will see data in bfile, so the file shouldn't be deleted
+ ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_->ReleaseSnapshot(snapshot);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size());
+ }
+ }
+ }
+}
+
+TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
+ Options options;
+ options.env = mock_env_.get();
+ mock_env_->set_current_time(0);
+ Open(BlobDBOptions(), options);
+ ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
+ ColumnFamilyHandle *handle = nullptr;
+ std::string value;
+ std::vector<std::string> values;
+ // The call simply pass through to base db. It should succeed.
+ ASSERT_OK(
+ blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle));
+ ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported());
+ ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60)
+ .IsNotSupported());
+ ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100)
+ .IsNotSupported());
+ WriteBatch batch;
+ batch.Put("k1", "v1");
+ batch.Put(handle, "k2", "v2");
+ ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported());
+ ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound());
+ ASSERT_TRUE(
+ blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported());
+ auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle},
+ {"k1", "k2"}, &values);
+ ASSERT_EQ(2, statuses.size());
+ ASSERT_TRUE(statuses[0].IsNotSupported());
+ ASSERT_TRUE(statuses[1].IsNotSupported());
+ ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle));
+ delete handle;
+}
+
+TEST_F(BlobDBTest, GetLiveFilesMetaData) {
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.blob_dir = "blob_dir";
+ bdb_options.path_relative = true;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (size_t i = 0; i < 100; i++) {
+ PutRandom("key" + ToString(i), &rnd, &data);
+ }
+ std::vector<LiveFileMetaData> metadata;
+ blob_db_->GetLiveFilesMetaData(&metadata);
+ ASSERT_EQ(1U, metadata.size());
+ // Path should be relative to db_name, but begin with slash.
+ std::string filename = "/blob_dir/000001.blob";
+ ASSERT_EQ(filename, metadata[0].name);
+ ASSERT_EQ("default", metadata[0].column_family_name);
+ std::vector<std::string> livefile;
+ uint64_t mfs;
+ ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false));
+ ASSERT_EQ(4U, livefile.size());
+ ASSERT_EQ(filename, livefile[3]);
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, MigrateFromPlainRocksDB) {
+ constexpr size_t kNumKey = 20;
+ constexpr size_t kNumIteration = 10;
+ Random rnd(301);
+ std::map<std::string, std::string> data;
+ std::vector<bool> is_blob(kNumKey, false);
+
+ // Write to plain rocksdb.
+ Options options;
+ options.create_if_missing = true;
+ DB *db = nullptr;
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ for (size_t i = 0; i < kNumIteration; i++) {
+ auto key_index = rnd.Next() % kNumKey;
+ std::string key = "key" + ToString(key_index);
+ PutRandom(db, key, &rnd, &data);
+ }
+ VerifyDB(db, data);
+ delete db;
+ db = nullptr;
+
+ // Open as blob db. Verify it can read existing data.
+ Open();
+ VerifyDB(blob_db_, data);
+ for (size_t i = 0; i < kNumIteration; i++) {
+ auto key_index = rnd.Next() % kNumKey;
+ std::string key = "key" + ToString(key_index);
+ is_blob[key_index] = true;
+ PutRandom(blob_db_, key, &rnd, &data);
+ }
+ VerifyDB(blob_db_, data);
+ delete blob_db_;
+ blob_db_ = nullptr;
+
+ // Verify plain db return error for keys written by blob db.
+ ASSERT_OK(DB::Open(options, dbname_, &db));
+ std::string value;
+ for (size_t i = 0; i < kNumKey; i++) {
+ std::string key = "key" + ToString(i);
+ Status s = db->Get(ReadOptions(), key, &value);
+ if (data.count(key) == 0) {
+ ASSERT_TRUE(s.IsNotFound());
+ } else if (is_blob[i]) {
+ ASSERT_TRUE(s.IsNotSupported());
+ } else {
+ ASSERT_OK(s);
+ ASSERT_EQ(data[key], value);
+ }
+ }
+ delete db;
+}
+
+// Test to verify that a NoSpace IOError Status is returned on reaching
+// max_db_size limit.
+TEST_F(BlobDBTest, OutOfSpace) {
+ // Use mock env to stop wall clock.
+ Options options;
+ options.env = mock_env_.get();
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 200;
+ bdb_options.is_fifo = false;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ // Each stored blob has an overhead of about 42 bytes currently.
+ // So a small key + a 100 byte blob should take up ~150 bytes in the db.
+ std::string value(100, 'v');
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60));
+
+ // Putting another blob should fail as ading it would exceed the max_db_size
+ // limit.
+ Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60);
+ ASSERT_TRUE(s.IsIOError());
+ ASSERT_TRUE(s.IsNoSpace());
+}
+
+TEST_F(BlobDBTest, FIFOEviction) {
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 200;
+ bdb_options.blob_file_size = 100;
+ bdb_options.is_fifo = true;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ std::atomic<int> evict_count{0};
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictOldestBlobFile:Evicted",
+ [&](void *) { evict_count++; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ // Each stored blob has an overhead of 32 bytes currently.
+ // So a 100 byte blob should take up 132 bytes.
+ std::string value(100, 'v');
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10));
+ VerifyDB({{"key1", value}});
+
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+
+ // Adding another 100 bytes blob would take the total size to 264 bytes
+ // (2*132). max_db_size will be exceeded
+ // than max_db_size and trigger FIFO eviction.
+ ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60));
+ ASSERT_EQ(1, evict_count);
+ // key1 will exist until corresponding file be deleted.
+ VerifyDB({{"key1", value}, {"key2", value}});
+
+ // Adding another 100 bytes blob without TTL.
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value));
+ ASSERT_EQ(2, evict_count);
+ // key1 and key2 will exist until corresponding file be deleted.
+ VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}});
+
+ // The fourth blob file, without TTL.
+ ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value));
+ ASSERT_EQ(3, evict_count);
+ VerifyDB(
+ {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}});
+
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(4, blob_files.size());
+ ASSERT_TRUE(blob_files[0]->Obsolete());
+ ASSERT_TRUE(blob_files[1]->Obsolete());
+ ASSERT_TRUE(blob_files[2]->Obsolete());
+ ASSERT_FALSE(blob_files[3]->Obsolete());
+ auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_EQ(3, obsolete_files.size());
+ ASSERT_EQ(blob_files[0], obsolete_files[0]);
+ ASSERT_EQ(blob_files[1], obsolete_files[1]);
+ ASSERT_EQ(blob_files[2], obsolete_files[2]);
+
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles();
+ ASSERT_TRUE(obsolete_files.empty());
+ VerifyDB({{"key4", value}});
+}
+
+TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) {
+ Options options;
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 1000;
+ bdb_options.blob_file_size = 5000;
+ bdb_options.is_fifo = true;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+
+ std::atomic<int> evict_count{0};
+ SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictOldestBlobFile:Evicted",
+ [&](void *) { evict_count++; });
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ std::string value(2000, 'v');
+ ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace());
+ ASSERT_EQ(0, evict_count);
+}
+
+TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) {
+ BlobDBOptions bdb_options;
+ bdb_options.is_fifo = true;
+ bdb_options.min_blob_size = 100;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ // Use mock env to stop wall clock.
+ options.env = mock_env_.get();
+ options.disable_auto_compactions = true;
+ auto statistics = CreateDBStatistics();
+ options.statistics = statistics;
+ Open(bdb_options, options);
+
+ ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size());
+ std::string small_value(50, 'v');
+ std::map<std::string, std::string> data;
+ // Insert some data into LSM tree to make sure FIFO eviction take SST
+ // file size into account.
+ for (int i = 0; i < 1000; i++) {
+ ASSERT_OK(Put("key" + ToString(i), small_value, &data));
+ }
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ uint64_t live_sst_size = 0;
+ ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize,
+ &live_sst_size));
+ ASSERT_TRUE(live_sst_size > 0);
+ ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
+
+ bdb_options.max_db_size = live_sst_size + 2000;
+ Reopen(bdb_options, options);
+ ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size());
+
+ std::string value_1k(1000, 'v');
+ ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data));
+ ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ VerifyDB(data);
+ // large_key2 evicts large_key1
+ ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ data.erase("large_key1");
+ VerifyDB(data);
+ // large_key3 get no enough space even after evicting large_key2, so it
+ // instead return no space error.
+ std::string value_2k(2000, 'v');
+ ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace());
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ // Verify large_key2 still exists.
+ VerifyDB(data);
+}
+
+// Test flush or compaction will trigger FIFO eviction since they update
+// total SST file size.
+TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) {
+ BlobDBOptions bdb_options;
+ bdb_options.max_db_size = 1000;
+ bdb_options.is_fifo = true;
+ bdb_options.min_blob_size = 100;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ // Use mock env to stop wall clock.
+ options.env = mock_env_.get();
+ auto statistics = CreateDBStatistics();
+ options.statistics = statistics;
+ options.compression = kNoCompression;
+ Open(bdb_options, options);
+
+ std::string value(800, 'v');
+ ASSERT_OK(PutWithTTL("large_key", value, 60));
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ VerifyDB({{"large_key", value}});
+
+ // Insert some small keys and flush to bring DB out of space.
+ std::map<std::string, std::string> data;
+ for (int i = 0; i < 10; i++) {
+ ASSERT_OK(Put("key" + ToString(i), "v", &data));
+ }
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+
+ // Verify large_key is deleted by FIFO eviction.
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ VerifyDB(data);
+}
+
+TEST_F(BlobDBTest, InlineSmallValues) {
+ constexpr uint64_t kMaxExpiration = 1000;
+ Random rnd(301);
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = kMaxExpiration;
+ bdb_options.min_blob_size = 100;
+ bdb_options.blob_file_size = 256 * 1000 * 1000;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.env = mock_env_.get();
+ mock_env_->set_current_time(0);
+ Open(bdb_options, options);
+ std::map<std::string, std::string> data;
+ std::map<std::string, KeyVersion> versions;
+ for (size_t i = 0; i < 1000; i++) {
+ bool is_small_value = rnd.Next() % 2;
+ bool has_ttl = rnd.Next() % 2;
+ uint64_t expiration = rnd.Next() % kMaxExpiration;
+ int len = is_small_value ? 50 : 200;
+ std::string key = "key" + ToString(i);
+ std::string value = test::RandomHumanReadableString(&rnd, len);
+ std::string blob_index;
+ data[key] = value;
+ SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1;
+ if (!has_ttl) {
+ ASSERT_OK(blob_db_->Put(WriteOptions(), key, value));
+ } else {
+ ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration));
+ }
+ ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence);
+ versions[key] =
+ KeyVersion(key, value, sequence,
+ (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex);
+ }
+ VerifyDB(data);
+ VerifyBaseDB(versions);
+ auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
+ auto blob_files = bdb_impl->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ std::shared_ptr<BlobFile> non_ttl_file;
+ std::shared_ptr<BlobFile> ttl_file;
+ if (blob_files[0]->HasTTL()) {
+ ttl_file = blob_files[0];
+ non_ttl_file = blob_files[1];
+ } else {
+ non_ttl_file = blob_files[0];
+ ttl_file = blob_files[1];
+ }
+ ASSERT_FALSE(non_ttl_file->HasTTL());
+ ASSERT_TRUE(ttl_file->HasTTL());
+}
+
+TEST_F(BlobDBTest, CompactionFilterNotSupported) {
+ class TestCompactionFilter : public CompactionFilter {
+ const char *Name() const override { return "TestCompactionFilter"; }
+ };
+ class TestCompactionFilterFactory : public CompactionFilterFactory {
+ const char *Name() const override { return "TestCompactionFilterFactory"; }
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context & /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(new TestCompactionFilter());
+ }
+ };
+ for (int i = 0; i < 2; i++) {
+ Options options;
+ if (i == 0) {
+ options.compaction_filter = new TestCompactionFilter();
+ } else {
+ options.compaction_filter_factory.reset(
+ new TestCompactionFilterFactory());
+ }
+ ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported());
+ delete options.compaction_filter;
+ }
+}
+
+// Test comapction filter should remove any expired blob index.
+TEST_F(BlobDBTest, FilterExpiredBlobIndex) {
+ constexpr size_t kNumKeys = 100;
+ constexpr size_t kNumPuts = 1000;
+ constexpr uint64_t kMaxExpiration = 1000;
+ constexpr uint64_t kCompactTime = 500;
+ constexpr uint64_t kMinBlobSize = 100;
+ Random rnd(301);
+ mock_env_->set_current_time(0);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = kMinBlobSize;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.env = mock_env_.get();
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, std::string> data_after_compact;
+ for (size_t i = 0; i < kNumPuts; i++) {
+ bool is_small_value = rnd.Next() % 2;
+ bool has_ttl = rnd.Next() % 2;
+ uint64_t expiration = rnd.Next() % kMaxExpiration;
+ int len = is_small_value ? 10 : 200;
+ std::string key = "key" + ToString(rnd.Next() % kNumKeys);
+ std::string value = test::RandomHumanReadableString(&rnd, len);
+ if (!has_ttl) {
+ if (is_small_value) {
+ std::string blob_entry;
+ BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value);
+ // Fake blob index with TTL. See what it will do.
+ ASSERT_GT(kMinBlobSize, blob_entry.size());
+ value = blob_entry;
+ }
+ ASSERT_OK(Put(key, value));
+ data_after_compact[key] = value;
+ } else {
+ ASSERT_OK(PutUntil(key, value, expiration));
+ if (expiration <= kCompactTime) {
+ data_after_compact.erase(key);
+ } else {
+ data_after_compact[key] = value;
+ }
+ }
+ data[key] = value;
+ }
+ VerifyDB(data);
+
+ mock_env_->set_current_time(kCompactTime);
+ // Take a snapshot before compaction. Make sure expired blob indexes is
+ // filtered regardless of snapshot.
+ const Snapshot *snapshot = blob_db_->GetSnapshot();
+ // Issue manual compaction to trigger compaction filter.
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(),
+ blob_db_->DefaultColumnFamily(), nullptr,
+ nullptr));
+ blob_db_->ReleaseSnapshot(snapshot);
+ // Verify expired blob index are filtered.
+ std::vector<KeyVersion> versions;
+ const size_t kMaxKeys = 10000;
+ GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions);
+ ASSERT_EQ(data_after_compact.size(), versions.size());
+ for (auto &version : versions) {
+ ASSERT_TRUE(data_after_compact.count(version.user_key) > 0);
+ }
+ VerifyDB(data_after_compact);
+}
+
+// Test compaction filter should remove any blob index where corresponding
+// blob file has been removed (either by FIFO or garbage collection).
+TEST_F(BlobDBTest, FilterFileNotAvailable) {
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.disable_auto_compactions = true;
+ Open(bdb_options, options);
+
+ ASSERT_OK(Put("foo", "v1"));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ ASSERT_EQ(1, blob_files[0]->BlobFileNumber());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0]));
+
+ ASSERT_OK(Put("bar", "v2"));
+ blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(2, blob_files.size());
+ ASSERT_EQ(2, blob_files[1]->BlobFileNumber());
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1]));
+
+ const size_t kMaxKeys = 10000;
+
+ DB *base_db = blob_db_->GetRootDB();
+ std::vector<KeyVersion> versions;
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(2, versions.size());
+ ASSERT_EQ("bar", versions[0].user_key);
+ ASSERT_EQ("foo", versions[1].user_key);
+ VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
+
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(2, versions.size());
+ ASSERT_EQ("bar", versions[0].user_key);
+ ASSERT_EQ("foo", versions[1].user_key);
+ VerifyDB({{"bar", "v2"}, {"foo", "v1"}});
+
+ // Remove the first blob file and compact. foo should be remove from base db.
+ blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(1, versions.size());
+ ASSERT_EQ("bar", versions[0].user_key);
+ VerifyDB({{"bar", "v2"}});
+
+ // Remove the second blob file and compact. bar should be remove from base db.
+ blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]);
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions));
+ ASSERT_EQ(0, versions.size());
+ VerifyDB({});
+}
+
+// Test compaction filter should filter any inlined TTL keys that would have
+// been dropped by last FIFO eviction if they are store out-of-line.
+TEST_F(BlobDBTest, FilterForFIFOEviction) {
+ Random rnd(215);
+ BlobDBOptions bdb_options;
+ bdb_options.min_blob_size = 100;
+ bdb_options.ttl_range_secs = 60;
+ bdb_options.max_db_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ // Use mock env to stop wall clock.
+ mock_env_->set_current_time(0);
+ options.env = mock_env_.get();
+ auto statistics = CreateDBStatistics();
+ options.statistics = statistics;
+ options.disable_auto_compactions = true;
+ Open(bdb_options, options);
+
+ std::map<std::string, std::string> data;
+ std::map<std::string, std::string> data_after_compact;
+ // Insert some small values that will be inlined.
+ for (int i = 0; i < 1000; i++) {
+ std::string key = "key" + ToString(i);
+ std::string value = test::RandomHumanReadableString(&rnd, 50);
+ uint64_t ttl = rnd.Next() % 120 + 1;
+ ASSERT_OK(PutWithTTL(key, value, ttl, &data));
+ if (ttl >= 60) {
+ data_after_compact[key] = value;
+ }
+ }
+ uint64_t num_keys_to_evict = data.size() - data_after_compact.size();
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size();
+ ASSERT_GT(live_sst_size, 0);
+ VerifyDB(data);
+
+ bdb_options.max_db_size = live_sst_size + 30000;
+ bdb_options.is_fifo = true;
+ Reopen(bdb_options, options);
+ VerifyDB(data);
+
+ // Put two large values, each on a different blob file.
+ std::string large_value(10000, 'v');
+ ASSERT_OK(PutWithTTL("large_key1", large_value, 90));
+ ASSERT_OK(PutWithTTL("large_key2", large_value, 150));
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ data["large_key1"] = large_value;
+ data["large_key2"] = large_value;
+ VerifyDB(data);
+
+ // Put a third large value which will bring the DB out of space.
+ // FIFO eviction will evict the file of large_key1.
+ ASSERT_OK(PutWithTTL("large_key3", large_value, 150));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ data.erase("large_key1");
+ data["large_key3"] = large_value;
+ VerifyDB(data);
+
+ // Putting some more small values. These values shouldn't be evicted by
+ // compaction filter since they are inserted after FIFO eviction.
+ ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact));
+ ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact));
+
+ // FIFO eviction doesn't trigger again since there enough room for the flush.
+ ASSERT_OK(blob_db_->Flush(FlushOptions()));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+
+ // Manual compact and check if compaction filter evict those keys with
+ // expiration < 60.
+ ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
+ // All keys with expiration < 60, plus large_key1 is filtered by
+ // compaction filter.
+ ASSERT_EQ(num_keys_to_evict + 1,
+ statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT));
+ ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED));
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ data_after_compact["large_key2"] = large_value;
+ data_after_compact["large_key3"] = large_value;
+ VerifyDB(data_after_compact);
+}
+
+// File should be evicted after expiration.
+TEST_F(BlobDBTest, EvictExpiredFile) {
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 100;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = true;
+ Options options;
+ options.env = mock_env_.get();
+ Open(bdb_options, options);
+ mock_env_->set_current_time(50);
+ std::map<std::string, std::string> data;
+ ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_FALSE(blob_file->Immutable());
+ ASSERT_FALSE(blob_file->Obsolete());
+ VerifyDB(data);
+ mock_env_->set_current_time(250);
+ // The key should expired now.
+ blob_db_impl()->TEST_EvictExpiredFiles();
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ ASSERT_TRUE(blob_file->Immutable());
+ ASSERT_TRUE(blob_file->Obsolete());
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ // Make sure we don't return garbage value after blob file being evicted,
+ // but the blob index still exists in the LSM tree.
+ std::string val = "";
+ ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound());
+ ASSERT_EQ("", val);
+}
+
+TEST_F(BlobDBTest, DisableFileDeletions) {
+ BlobDBOptions bdb_options;
+ bdb_options.disable_background_tasks = true;
+ Open(bdb_options);
+ std::map<std::string, std::string> data;
+ for (bool force : {true, false}) {
+ ASSERT_OK(Put("foo", "v", &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file));
+ blob_db_impl()->TEST_ObsoleteBlobFile(blob_file);
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ // Call DisableFileDeletions twice.
+ ASSERT_OK(blob_db_->DisableFileDeletions());
+ ASSERT_OK(blob_db_->DisableFileDeletions());
+ // File deletions should be disabled.
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB(data);
+ // Enable file deletions once. If force=true, file deletion is enabled.
+ // Otherwise it needs to enable it for a second time.
+ ASSERT_OK(blob_db_->EnableFileDeletions(force));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ if (!force) {
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB(data);
+ // Call EnableFileDeletions a second time.
+ ASSERT_OK(blob_db_->EnableFileDeletions(false));
+ blob_db_impl()->TEST_DeleteObsoleteFiles();
+ }
+ // Regardless of value of `force`, file should be deleted by now.
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size());
+ ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size());
+ VerifyDB({});
+ }
+}
+
+TEST_F(BlobDBTest, ShutdownWait) {
+ BlobDBOptions bdb_options;
+ bdb_options.ttl_range_secs = 100;
+ bdb_options.min_blob_size = 0;
+ bdb_options.disable_background_tasks = false;
+ Options options;
+ options.env = mock_env_.get();
+
+ SyncPoint::GetInstance()->LoadDependency({
+ {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"},
+ {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"},
+ {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"},
+ {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"},
+ });
+ // Force all tasks to be scheduled immediately.
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "TimeQueue::Add:item.end", [&](void *arg) {
+ std::chrono::steady_clock::time_point *tp =
+ static_cast<std::chrono::steady_clock::time_point *>(arg);
+ *tp =
+ std::chrono::steady_clock::now() - std::chrono::milliseconds(10000);
+ });
+
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) {
+ // Sleep 3 ms to increase the chance of data race.
+ // We've synced up the code so that EvictExpiredFiles()
+ // is called concurrently with ~BlobDBImpl().
+ // ~BlobDBImpl() is supposed to wait for all background
+ // task to shutdown before doing anything else. In order
+ // to use the same test to reproduce a bug of the waiting
+ // logic, we wait a little bit here, so that TSAN can
+ // catch the data race.
+ // We should improve the test if we find a better way.
+ Env::Default()->SleepForMicroseconds(3000);
+ });
+
+ SyncPoint::GetInstance()->EnableProcessing();
+
+ Open(bdb_options, options);
+ mock_env_->set_current_time(50);
+ std::map<std::string, std::string> data;
+ ASSERT_OK(PutWithTTL("foo", "bar", 100, &data));
+ auto blob_files = blob_db_impl()->TEST_GetBlobFiles();
+ ASSERT_EQ(1, blob_files.size());
+ auto blob_file = blob_files[0];
+ ASSERT_FALSE(blob_file->Immutable());
+ ASSERT_FALSE(blob_file->Obsolete());
+ VerifyDB(data);
+
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0");
+ mock_env_->set_current_time(250);
+ // The key should expired now.
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1");
+
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2");
+ TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3");
+ Close();
+
+ SyncPoint::GetInstance()->DisableProcessing();
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+
+// A black-box test for the ttl wrapper around rocksdb
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // !ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.cc b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc
new file mode 100644
index 00000000..37eee19d
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc
@@ -0,0 +1,279 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "utilities/blob_db/blob_dump_tool.h"
+#include <inttypes.h>
+#include <stdio.h>
+#include <iostream>
+#include <memory>
+#include <string>
+#include "port/port.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/env.h"
+#include "table/format.h"
+#include "util/coding.h"
+#include "util/file_reader_writer.h"
+#include "util/string_util.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+BlobDumpTool::BlobDumpTool()
+ : reader_(nullptr), buffer_(nullptr), buffer_size_(0) {}
+
+Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key,
+ DisplayType show_blob,
+ DisplayType show_uncompressed_blob,
+ bool show_summary) {
+ constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
+ Status s;
+ Env* env = Env::Default();
+ s = env->FileExists(filename);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t file_size = 0;
+ s = env->GetFileSize(filename, &file_size);
+ if (!s.ok()) {
+ return s;
+ }
+ std::unique_ptr<RandomAccessFile> file;
+ s = env->NewRandomAccessFile(filename, &file, EnvOptions());
+ if (!s.ok()) {
+ return s;
+ }
+ file = NewReadaheadRandomAccessFile(std::move(file), kReadaheadSize);
+ if (file_size == 0) {
+ return Status::Corruption("File is empty.");
+ }
+ reader_.reset(new RandomAccessFileReader(std::move(file), filename));
+ uint64_t offset = 0;
+ uint64_t footer_offset = 0;
+ CompressionType compression = kNoCompression;
+ s = DumpBlobLogHeader(&offset, &compression);
+ if (!s.ok()) {
+ return s;
+ }
+ s = DumpBlobLogFooter(file_size, &footer_offset);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t total_records = 0;
+ uint64_t total_key_size = 0;
+ uint64_t total_blob_size = 0;
+ uint64_t total_uncompressed_blob_size = 0;
+ if (show_key != DisplayType::kNone || show_summary) {
+ while (offset < footer_offset) {
+ s = DumpRecord(show_key, show_blob, show_uncompressed_blob, show_summary,
+ compression, &offset, &total_records, &total_key_size,
+ &total_blob_size, &total_uncompressed_blob_size);
+ if (!s.ok()) {
+ break;
+ }
+ }
+ }
+ if (show_summary) {
+ fprintf(stdout, "Summary:\n");
+ fprintf(stdout, " total records: %" PRIu64 "\n", total_records);
+ fprintf(stdout, " total key size: %" PRIu64 "\n", total_key_size);
+ fprintf(stdout, " total blob size: %" PRIu64 "\n", total_blob_size);
+ if (compression != kNoCompression) {
+ fprintf(stdout, " total raw blob size: %" PRIu64 "\n",
+ total_uncompressed_blob_size);
+ }
+ }
+ return s;
+}
+
+Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) {
+ if (buffer_size_ < size) {
+ if (buffer_size_ == 0) {
+ buffer_size_ = 4096;
+ }
+ while (buffer_size_ < size) {
+ buffer_size_ *= 2;
+ }
+ buffer_.reset(new char[buffer_size_]);
+ }
+ Status s = reader_->Read(offset, size, result, buffer_.get());
+ if (!s.ok()) {
+ return s;
+ }
+ if (result->size() != size) {
+ return Status::Corruption("Reach the end of the file unexpectedly.");
+ }
+ return s;
+}
+
+Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset,
+ CompressionType* compression) {
+ Slice slice;
+ Status s = Read(0, BlobLogHeader::kSize, &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ BlobLogHeader header;
+ s = header.DecodeFrom(slice);
+ if (!s.ok()) {
+ return s;
+ }
+ fprintf(stdout, "Blob log header:\n");
+ fprintf(stdout, " Version : %" PRIu32 "\n", header.version);
+ fprintf(stdout, " Column Family ID : %" PRIu32 "\n",
+ header.column_family_id);
+ std::string compression_str;
+ if (!GetStringFromCompressionType(&compression_str, header.compression)
+ .ok()) {
+ compression_str = "Unrecongnized compression type (" +
+ ToString((int)header.compression) + ")";
+ }
+ fprintf(stdout, " Compression : %s\n", compression_str.c_str());
+ fprintf(stdout, " Expiration range : %s\n",
+ GetString(header.expiration_range).c_str());
+ *offset = BlobLogHeader::kSize;
+ *compression = header.compression;
+ return s;
+}
+
+Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size,
+ uint64_t* footer_offset) {
+ auto no_footer = [&]() {
+ *footer_offset = file_size;
+ fprintf(stdout, "No blob log footer.\n");
+ return Status::OK();
+ };
+ if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
+ return no_footer();
+ }
+ Slice slice;
+ *footer_offset = file_size - BlobLogFooter::kSize;
+ Status s = Read(*footer_offset, BlobLogFooter::kSize, &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ BlobLogFooter footer;
+ s = footer.DecodeFrom(slice);
+ if (!s.ok()) {
+ return no_footer();
+ }
+ fprintf(stdout, "Blob log footer:\n");
+ fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count);
+ fprintf(stdout, " Expiration Range : %s\n",
+ GetString(footer.expiration_range).c_str());
+ return s;
+}
+
+Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob,
+ DisplayType show_uncompressed_blob,
+ bool show_summary, CompressionType compression,
+ uint64_t* offset, uint64_t* total_records,
+ uint64_t* total_key_size,
+ uint64_t* total_blob_size,
+ uint64_t* total_uncompressed_blob_size) {
+ if (show_key != DisplayType::kNone) {
+ fprintf(stdout, "Read record with offset 0x%" PRIx64 " (%" PRIu64 "):\n",
+ *offset, *offset);
+ }
+ Slice slice;
+ Status s = Read(*offset, BlobLogRecord::kHeaderSize, &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ BlobLogRecord record;
+ s = record.DecodeHeaderFrom(slice);
+ if (!s.ok()) {
+ return s;
+ }
+ uint64_t key_size = record.key_size;
+ uint64_t value_size = record.value_size;
+ if (show_key != DisplayType::kNone) {
+ fprintf(stdout, " key size : %" PRIu64 "\n", key_size);
+ fprintf(stdout, " value size : %" PRIu64 "\n", value_size);
+ fprintf(stdout, " expiration : %" PRIu64 "\n", record.expiration);
+ }
+ *offset += BlobLogRecord::kHeaderSize;
+ s = Read(*offset, static_cast<size_t>(key_size + value_size), &slice);
+ if (!s.ok()) {
+ return s;
+ }
+ // Decompress value
+ std::string uncompressed_value;
+ if (compression != kNoCompression &&
+ (show_uncompressed_blob != DisplayType::kNone || show_summary)) {
+ BlockContents contents;
+ UncompressionContext context(compression);
+ UncompressionInfo info(context, UncompressionDict::GetEmptyDict(),
+ compression);
+ s = UncompressBlockContentsForCompressionType(
+ info, slice.data() + key_size, static_cast<size_t>(value_size),
+ &contents, 2 /*compress_format_version*/,
+ ImmutableCFOptions(Options()));
+ if (!s.ok()) {
+ return s;
+ }
+ uncompressed_value = contents.data.ToString();
+ }
+ if (show_key != DisplayType::kNone) {
+ fprintf(stdout, " key : ");
+ DumpSlice(Slice(slice.data(), static_cast<size_t>(key_size)), show_key);
+ if (show_blob != DisplayType::kNone) {
+ fprintf(stdout, " blob : ");
+ DumpSlice(Slice(slice.data() + static_cast<size_t>(key_size), static_cast<size_t>(value_size)), show_blob);
+ }
+ if (show_uncompressed_blob != DisplayType::kNone) {
+ fprintf(stdout, " raw blob : ");
+ DumpSlice(Slice(uncompressed_value), show_uncompressed_blob);
+ }
+ }
+ *offset += key_size + value_size;
+ *total_records += 1;
+ *total_key_size += key_size;
+ *total_blob_size += value_size;
+ *total_uncompressed_blob_size += uncompressed_value.size();
+ return s;
+}
+
+void BlobDumpTool::DumpSlice(const Slice s, DisplayType type) {
+ if (type == DisplayType::kRaw) {
+ fprintf(stdout, "%s\n", s.ToString().c_str());
+ } else if (type == DisplayType::kHex) {
+ fprintf(stdout, "%s\n", s.ToString(true /*hex*/).c_str());
+ } else if (type == DisplayType::kDetail) {
+ char buf[100];
+ for (size_t i = 0; i < s.size(); i += 16) {
+ memset(buf, 0, sizeof(buf));
+ for (size_t j = 0; j < 16 && i + j < s.size(); j++) {
+ unsigned char c = s[i + j];
+ snprintf(buf + j * 3 + 15, 2, "%x", c >> 4);
+ snprintf(buf + j * 3 + 16, 2, "%x", c & 0xf);
+ snprintf(buf + j + 65, 2, "%c", (0x20 <= c && c <= 0x7e) ? c : '.');
+ }
+ for (size_t p = 0; p < sizeof(buf) - 1; p++) {
+ if (buf[p] == 0) {
+ buf[p] = ' ';
+ }
+ }
+ fprintf(stdout, "%s\n", i == 0 ? buf + 15 : buf);
+ }
+ }
+}
+
+template <class T>
+std::string BlobDumpTool::GetString(std::pair<T, T> p) {
+ if (p.first == 0 && p.second == 0) {
+ return "nil";
+ }
+ return "(" + ToString(p.first) + ", " + ToString(p.second) + ")";
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.h b/src/rocksdb/utilities/blob_db/blob_dump_tool.h
new file mode 100644
index 00000000..ff4672fd
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.h
@@ -0,0 +1,57 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <memory>
+#include <string>
+#include <utility>
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "util/file_reader_writer.h"
+#include "utilities/blob_db/blob_log_format.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+class BlobDumpTool {
+ public:
+ enum class DisplayType {
+ kNone,
+ kRaw,
+ kHex,
+ kDetail,
+ };
+
+ BlobDumpTool();
+
+ Status Run(const std::string& filename, DisplayType show_key,
+ DisplayType show_blob, DisplayType show_uncompressed_blob,
+ bool show_summary);
+
+ private:
+ std::unique_ptr<RandomAccessFileReader> reader_;
+ std::unique_ptr<char[]> buffer_;
+ size_t buffer_size_;
+
+ Status Read(uint64_t offset, size_t size, Slice* result);
+ Status DumpBlobLogHeader(uint64_t* offset, CompressionType* compression);
+ Status DumpBlobLogFooter(uint64_t file_size, uint64_t* footer_offset);
+ Status DumpRecord(DisplayType show_key, DisplayType show_blob,
+ DisplayType show_uncompressed_blob, bool show_summary,
+ CompressionType compression, uint64_t* offset,
+ uint64_t* total_records, uint64_t* total_key_size,
+ uint64_t* total_blob_size,
+ uint64_t* total_uncompressed_blob_size);
+ void DumpSlice(const Slice s, DisplayType type);
+
+ template <class T>
+ std::string GetString(std::pair<T, T> p);
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_file.cc b/src/rocksdb/utilities/blob_db/blob_file.cc
new file mode 100644
index 00000000..3bcbd048
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_file.cc
@@ -0,0 +1,336 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+#include "utilities/blob_db/blob_file.h"
+
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include <inttypes.h>
+#include <stdio.h>
+
+#include <algorithm>
+#include <limits>
+#include <memory>
+
+#include "db/column_family.h"
+#include "db/db_impl.h"
+#include "db/dbformat.h"
+#include "util/filename.h"
+#include "util/logging.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace rocksdb {
+
+namespace blob_db {
+
+BlobFile::BlobFile()
+ : parent_(nullptr),
+ file_number_(0),
+ info_log_(nullptr),
+ column_family_id_(std::numeric_limits<uint32_t>::max()),
+ compression_(kNoCompression),
+ has_ttl_(false),
+ blob_count_(0),
+ file_size_(0),
+ closed_(false),
+ obsolete_(false),
+ expiration_range_({0, 0}),
+ last_access_(-1),
+ last_fsync_(0),
+ header_valid_(false),
+ footer_valid_(false) {}
+
+BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn,
+ Logger* info_log)
+ : parent_(p),
+ path_to_dir_(bdir),
+ file_number_(fn),
+ info_log_(info_log),
+ column_family_id_(std::numeric_limits<uint32_t>::max()),
+ compression_(kNoCompression),
+ has_ttl_(false),
+ blob_count_(0),
+ file_size_(0),
+ closed_(false),
+ obsolete_(false),
+ expiration_range_({0, 0}),
+ last_access_(-1),
+ last_fsync_(0),
+ header_valid_(false),
+ footer_valid_(false) {}
+
+BlobFile::~BlobFile() {
+ if (obsolete_) {
+ std::string pn(PathName());
+ Status s = Env::Default()->DeleteFile(PathName());
+ if (!s.ok()) {
+ // ROCKS_LOG_INFO(db_options_.info_log,
+ // "File could not be deleted %s", pn.c_str());
+ }
+ }
+}
+
+uint32_t BlobFile::column_family_id() const { return column_family_id_; }
+
+std::string BlobFile::PathName() const {
+ return BlobFileName(path_to_dir_, file_number_);
+}
+
+std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader(
+ Env* env, const DBOptions& db_options,
+ const EnvOptions& env_options) const {
+ constexpr size_t kReadaheadSize = 2 * 1024 * 1024;
+ std::unique_ptr<RandomAccessFile> sfile;
+ std::string path_name(PathName());
+ Status s = env->NewRandomAccessFile(path_name, &sfile, env_options);
+ if (!s.ok()) {
+ // report something here.
+ return nullptr;
+ }
+ sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize);
+
+ std::unique_ptr<RandomAccessFileReader> sfile_reader;
+ sfile_reader.reset(new RandomAccessFileReader(std::move(sfile), path_name));
+
+ std::shared_ptr<Reader> log_reader = std::make_shared<Reader>(
+ std::move(sfile_reader), db_options.env, db_options.statistics.get());
+
+ return log_reader;
+}
+
+std::string BlobFile::DumpState() const {
+ char str[1000];
+ snprintf(
+ str, sizeof(str),
+ "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64
+ " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64
+ "), writer: %d reader: %d",
+ path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(),
+ closed_.load(), obsolete_.load(), expiration_range_.first,
+ expiration_range_.second, (!!log_writer_), (!!ra_file_reader_));
+ return str;
+}
+
+void BlobFile::MarkObsolete(SequenceNumber sequence) {
+ assert(Immutable());
+ obsolete_sequence_ = sequence;
+ obsolete_.store(true);
+}
+
+bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const {
+ assert(last_fsync_ <= file_size_);
+ return (hard) ? file_size_ > last_fsync_
+ : (file_size_ - last_fsync_) >= bytes_per_sync;
+}
+
+Status BlobFile::WriteFooterAndCloseLocked() {
+ BlobLogFooter footer;
+ footer.blob_count = blob_count_;
+ if (HasTTL()) {
+ footer.expiration_range = expiration_range_;
+ }
+
+ // this will close the file and reset the Writable File Pointer.
+ Status s = log_writer_->AppendFooter(footer);
+ if (s.ok()) {
+ closed_ = true;
+ file_size_ += BlobLogFooter::kSize;
+ }
+ // delete the sequential writer
+ log_writer_.reset();
+ return s;
+}
+
+Status BlobFile::ReadFooter(BlobLogFooter* bf) {
+ if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) {
+ return Status::IOError("File does not have footer", PathName());
+ }
+
+ uint64_t footer_offset = file_size_ - BlobLogFooter::kSize;
+ // assume that ra_file_reader_ is valid before we enter this
+ assert(ra_file_reader_);
+
+ Slice result;
+ char scratch[BlobLogFooter::kSize + 10];
+ Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result,
+ scratch);
+ if (!s.ok()) return s;
+ if (result.size() != BlobLogFooter::kSize) {
+ // should not happen
+ return Status::IOError("EOF reached before footer");
+ }
+
+ s = bf->DecodeFrom(result);
+ return s;
+}
+
+Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) {
+ // assume that file has been fully fsync'd
+ last_fsync_.store(file_size_);
+ blob_count_ = footer.blob_count;
+ expiration_range_ = footer.expiration_range;
+ closed_ = true;
+ return Status::OK();
+}
+
+Status BlobFile::Fsync() {
+ Status s;
+ if (log_writer_.get()) {
+ s = log_writer_->Sync();
+ last_fsync_.store(file_size_.load());
+ }
+ return s;
+}
+
+void BlobFile::CloseRandomAccessLocked() {
+ ra_file_reader_.reset();
+ last_access_ = -1;
+}
+
+Status BlobFile::GetReader(Env* env, const EnvOptions& env_options,
+ std::shared_ptr<RandomAccessFileReader>* reader,
+ bool* fresh_open) {
+ assert(reader != nullptr);
+ assert(fresh_open != nullptr);
+ *fresh_open = false;
+ int64_t current_time = 0;
+ env->GetCurrentTime(&current_time);
+ last_access_.store(current_time);
+ Status s;
+
+ {
+ ReadLock lockbfile_r(&mutex_);
+ if (ra_file_reader_) {
+ *reader = ra_file_reader_;
+ return s;
+ }
+ }
+
+ WriteLock lockbfile_w(&mutex_);
+ // Double check.
+ if (ra_file_reader_) {
+ *reader = ra_file_reader_;
+ return s;
+ }
+
+ std::unique_ptr<RandomAccessFile> rfile;
+ s = env->NewRandomAccessFile(PathName(), &rfile, env_options);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to open blob file for random-read: %s status: '%s'"
+ " exists: '%s'",
+ PathName().c_str(), s.ToString().c_str(),
+ env->FileExists(PathName()).ToString().c_str());
+ return s;
+ }
+
+ ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile),
+ PathName());
+ *reader = ra_file_reader_;
+ *fresh_open = true;
+ return s;
+}
+
+Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) {
+ assert(Immutable());
+ // Get file size.
+ uint64_t file_size = 0;
+ Status s = env->GetFileSize(PathName(), &file_size);
+ if (s.ok()) {
+ file_size_ = file_size;
+ } else {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to get size of blob file %" PRIu64
+ ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ if (file_size < BlobLogHeader::kSize) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Incomplete blob file blob file %" PRIu64
+ ", size: %" PRIu64,
+ file_number_, file_size);
+ return Status::Corruption("Incomplete blob file header.");
+ }
+
+ // Create file reader.
+ std::unique_ptr<RandomAccessFile> file;
+ s = env->NewRandomAccessFile(PathName(), &file, env_options);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to open blob file %" PRIu64 ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ std::unique_ptr<RandomAccessFileReader> file_reader(
+ new RandomAccessFileReader(std::move(file), PathName()));
+
+ // Read file header.
+ char header_buf[BlobLogHeader::kSize];
+ Slice header_slice;
+ s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to read header of blob file %" PRIu64
+ ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ BlobLogHeader header;
+ s = header.DecodeFrom(header_slice);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to decode header of blob file %" PRIu64
+ ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ column_family_id_ = header.column_family_id;
+ compression_ = header.compression;
+ has_ttl_ = header.has_ttl;
+ if (has_ttl_) {
+ expiration_range_ = header.expiration_range;
+ }
+ header_valid_ = true;
+
+ // Read file footer.
+ if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) {
+ // OK not to have footer.
+ assert(!footer_valid_);
+ return Status::OK();
+ }
+ char footer_buf[BlobLogFooter::kSize];
+ Slice footer_slice;
+ s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize,
+ &footer_slice, footer_buf);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(info_log_,
+ "Failed to read footer of blob file %" PRIu64
+ ", status: %s",
+ file_number_, s.ToString().c_str());
+ return s;
+ }
+ BlobLogFooter footer;
+ s = footer.DecodeFrom(footer_slice);
+ if (!s.ok()) {
+ // OK not to have footer.
+ assert(!footer_valid_);
+ return Status::OK();
+ }
+ blob_count_ = footer.blob_count;
+ if (has_ttl_) {
+ assert(header.expiration_range.first <= footer.expiration_range.first);
+ assert(header.expiration_range.second >= footer.expiration_range.second);
+ expiration_range_ = footer.expiration_range;
+ }
+ footer_valid_ = true;
+ return Status::OK();
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_file.h b/src/rocksdb/utilities/blob_db/blob_file.h
new file mode 100644
index 00000000..668a0372
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_file.h
@@ -0,0 +1,214 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <atomic>
+#include <memory>
+
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "rocksdb/options.h"
+#include "util/file_reader_writer.h"
+#include "utilities/blob_db/blob_log_format.h"
+#include "utilities/blob_db/blob_log_reader.h"
+#include "utilities/blob_db/blob_log_writer.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+class BlobDBImpl;
+
+class BlobFile {
+ friend class BlobDBImpl;
+ friend struct BlobFileComparator;
+ friend struct BlobFileComparatorTTL;
+
+ private:
+ // access to parent
+ const BlobDBImpl* parent_;
+
+ // path to blob directory
+ std::string path_to_dir_;
+
+ // the id of the file.
+ // the above 2 are created during file creation and never changed
+ // after that
+ uint64_t file_number_;
+
+ // Info log.
+ Logger* info_log_;
+
+ // Column family id.
+ uint32_t column_family_id_;
+
+ // Compression type of blobs in the file
+ CompressionType compression_;
+
+ // If true, the keys in this file all has TTL. Otherwise all keys don't
+ // have TTL.
+ bool has_ttl_;
+
+ // number of blobs in the file
+ std::atomic<uint64_t> blob_count_;
+
+ // size of the file
+ std::atomic<uint64_t> file_size_;
+
+ BlobLogHeader header_;
+
+ // closed_ = true implies the file is no more mutable
+ // no more blobs will be appended and the footer has been written out
+ std::atomic<bool> closed_;
+
+ // has a pass of garbage collection successfully finished on this file
+ // obsolete_ still needs to do iterator/snapshot checks
+ std::atomic<bool> obsolete_;
+
+ // The last sequence number by the time the file marked as obsolete.
+ // Data in this file is visible to a snapshot taken before the sequence.
+ SequenceNumber obsolete_sequence_;
+
+ ExpirationRange expiration_range_;
+
+ // Sequential/Append writer for blobs
+ std::shared_ptr<Writer> log_writer_;
+
+ // random access file reader for GET calls
+ std::shared_ptr<RandomAccessFileReader> ra_file_reader_;
+
+ // This Read-Write mutex is per file specific and protects
+ // all the datastructures
+ mutable port::RWMutex mutex_;
+
+ // time when the random access reader was last created.
+ std::atomic<std::int64_t> last_access_;
+
+ // last time file was fsync'd/fdatasyncd
+ std::atomic<uint64_t> last_fsync_;
+
+ bool header_valid_;
+
+ bool footer_valid_;
+
+ SequenceNumber garbage_collection_finish_sequence_;
+
+ public:
+ BlobFile();
+
+ BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum,
+ Logger* info_log);
+
+ ~BlobFile();
+
+ uint32_t column_family_id() const;
+
+ void SetColumnFamilyId(uint32_t cf_id) {
+ column_family_id_ = cf_id;
+ }
+
+ // Returns log file's absolute pathname.
+ std::string PathName() const;
+
+ // Primary identifier for blob file.
+ // once the file is created, this never changes
+ uint64_t BlobFileNumber() const { return file_number_; }
+
+ // the following functions are atomic, and don't need
+ // read lock
+ uint64_t BlobCount() const {
+ return blob_count_.load(std::memory_order_acquire);
+ }
+
+ std::string DumpState() const;
+
+ // if the file is not taking any more appends.
+ bool Immutable() const { return closed_.load(); }
+
+ // Mark the file as immutable.
+ // REQUIRES: write lock held, or access from single thread (on DB open).
+ void MarkImmutable() { closed_ = true; }
+
+ // if the file has gone through GC and blobs have been relocated
+ bool Obsolete() const {
+ assert(Immutable() || !obsolete_.load());
+ return obsolete_.load();
+ }
+
+ // Mark file as obsolete by garbage collection. The file is not visible to
+ // snapshots with sequence greater or equal to the given sequence.
+ void MarkObsolete(SequenceNumber sequence);
+
+ SequenceNumber GetObsoleteSequence() const {
+ assert(Obsolete());
+ return obsolete_sequence_;
+ }
+
+ // we will assume this is atomic
+ bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const;
+
+ Status Fsync();
+
+ uint64_t GetFileSize() const {
+ return file_size_.load(std::memory_order_acquire);
+ }
+
+ // All Get functions which are not atomic, will need ReadLock on the mutex
+
+ ExpirationRange GetExpirationRange() const { return expiration_range_; }
+
+ void ExtendExpirationRange(uint64_t expiration) {
+ expiration_range_.first = std::min(expiration_range_.first, expiration);
+ expiration_range_.second = std::max(expiration_range_.second, expiration);
+ }
+
+ bool HasTTL() const { return has_ttl_; }
+
+ void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; }
+
+ CompressionType compression() const { return compression_; }
+
+ void SetCompression(CompressionType c) {
+ compression_ = c;
+ }
+
+ std::shared_ptr<Writer> GetWriter() const { return log_writer_; }
+
+ // Read blob file header and footer. Return corruption if file header is
+ // malform or incomplete. If footer is malform or incomplete, set
+ // footer_valid_ to false and return Status::OK.
+ Status ReadMetadata(Env* env, const EnvOptions& env_options);
+
+ Status GetReader(Env* env, const EnvOptions& env_options,
+ std::shared_ptr<RandomAccessFileReader>* reader,
+ bool* fresh_open);
+
+ private:
+ std::shared_ptr<Reader> OpenRandomAccessReader(
+ Env* env, const DBOptions& db_options,
+ const EnvOptions& env_options) const;
+
+ Status ReadFooter(BlobLogFooter* footer);
+
+ Status WriteFooterAndCloseLocked();
+
+ void CloseRandomAccessLocked();
+
+ // this is used, when you are reading only the footer of a
+ // previously closed file
+ Status SetFromFooterLocked(const BlobLogFooter& footer);
+
+ void set_expiration_range(const ExpirationRange& expiration_range) {
+ expiration_range_ = expiration_range;
+ }
+
+ // The following functions are atomic, and don't need locks
+ void SetFileSize(uint64_t fs) { file_size_ = fs; }
+
+ void SetBlobCount(uint64_t bc) { blob_count_ = bc; }
+};
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_index.h b/src/rocksdb/utilities/blob_db/blob_index.h
new file mode 100644
index 00000000..fd91b547
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_index.h
@@ -0,0 +1,161 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include "rocksdb/options.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+// BlobIndex is a pointer to the blob and metadata of the blob. The index is
+// stored in base DB as ValueType::kTypeBlobIndex.
+// There are three types of blob index:
+//
+// kInlinedTTL:
+// +------+------------+---------------+
+// | type | expiration | value |
+// +------+------------+---------------+
+// | char | varint64 | variable size |
+// +------+------------+---------------+
+//
+// kBlob:
+// +------+-------------+----------+----------+-------------+
+// | type | file number | offset | size | compression |
+// +------+-------------+----------+----------+-------------+
+// | char | varint64 | varint64 | varint64 | char |
+// +------+-------------+----------+----------+-------------+
+//
+// kBlobTTL:
+// +------+------------+-------------+----------+----------+-------------+
+// | type | expiration | file number | offset | size | compression |
+// +------+------------+-------------+----------+----------+-------------+
+// | char | varint64 | varint64 | varint64 | varint64 | char |
+// +------+------------+-------------+----------+----------+-------------+
+//
+// There isn't a kInlined (without TTL) type since we can store it as a plain
+// value (i.e. ValueType::kTypeValue).
+class BlobIndex {
+ public:
+ enum class Type : unsigned char {
+ kInlinedTTL = 0,
+ kBlob = 1,
+ kBlobTTL = 2,
+ kUnknown = 3,
+ };
+
+ BlobIndex() : type_(Type::kUnknown) {}
+
+ bool IsInlined() const { return type_ == Type::kInlinedTTL; }
+
+ bool HasTTL() const {
+ return type_ == Type::kInlinedTTL || type_ == Type::kBlobTTL;
+ }
+
+ uint64_t expiration() const {
+ assert(HasTTL());
+ return expiration_;
+ }
+
+ const Slice& value() const {
+ assert(IsInlined());
+ return value_;
+ }
+
+ uint64_t file_number() const {
+ assert(!IsInlined());
+ return file_number_;
+ }
+
+ uint64_t offset() const {
+ assert(!IsInlined());
+ return offset_;
+ }
+
+ uint64_t size() const {
+ assert(!IsInlined());
+ return size_;
+ }
+
+ Status DecodeFrom(Slice slice) {
+ static const std::string kErrorMessage = "Error while decoding blob index";
+ assert(slice.size() > 0);
+ type_ = static_cast<Type>(*slice.data());
+ if (type_ >= Type::kUnknown) {
+ return Status::Corruption(
+ kErrorMessage,
+ "Unknown blob index type: " + ToString(static_cast<char>(type_)));
+ }
+ slice = Slice(slice.data() + 1, slice.size() - 1);
+ if (HasTTL()) {
+ if (!GetVarint64(&slice, &expiration_)) {
+ return Status::Corruption(kErrorMessage, "Corrupted expiration");
+ }
+ }
+ if (IsInlined()) {
+ value_ = slice;
+ } else {
+ if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) &&
+ GetVarint64(&slice, &size_) && slice.size() == 1) {
+ compression_ = static_cast<CompressionType>(*slice.data());
+ } else {
+ return Status::Corruption(kErrorMessage, "Corrupted blob offset");
+ }
+ }
+ return Status::OK();
+ }
+
+ static void EncodeInlinedTTL(std::string* dst, uint64_t expiration,
+ const Slice& value) {
+ assert(dst != nullptr);
+ dst->clear();
+ dst->reserve(1 + kMaxVarint64Length + value.size());
+ dst->push_back(static_cast<char>(Type::kInlinedTTL));
+ PutVarint64(dst, expiration);
+ dst->append(value.data(), value.size());
+ }
+
+ static void EncodeBlob(std::string* dst, uint64_t file_number,
+ uint64_t offset, uint64_t size,
+ CompressionType compression) {
+ assert(dst != nullptr);
+ dst->clear();
+ dst->reserve(kMaxVarint64Length * 3 + 2);
+ dst->push_back(static_cast<char>(Type::kBlob));
+ PutVarint64(dst, file_number);
+ PutVarint64(dst, offset);
+ PutVarint64(dst, size);
+ dst->push_back(static_cast<char>(compression));
+ }
+
+ static void EncodeBlobTTL(std::string* dst, uint64_t expiration,
+ uint64_t file_number, uint64_t offset,
+ uint64_t size, CompressionType compression) {
+ assert(dst != nullptr);
+ dst->clear();
+ dst->reserve(kMaxVarint64Length * 4 + 2);
+ dst->push_back(static_cast<char>(Type::kBlobTTL));
+ PutVarint64(dst, expiration);
+ PutVarint64(dst, file_number);
+ PutVarint64(dst, offset);
+ PutVarint64(dst, size);
+ dst->push_back(static_cast<char>(compression));
+ }
+
+ private:
+ Type type_ = Type::kUnknown;
+ uint64_t expiration_ = 0;
+ Slice value_;
+ uint64_t file_number_ = 0;
+ uint64_t offset_ = 0;
+ uint64_t size_ = 0;
+ CompressionType compression_ = kNoCompression;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_log_format.cc b/src/rocksdb/utilities/blob_db/blob_log_format.cc
new file mode 100644
index 00000000..8726cb8f
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_format.cc
@@ -0,0 +1,149 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_log_format.h"
+
+#include "util/coding.h"
+#include "util/crc32c.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+void BlobLogHeader::EncodeTo(std::string* dst) {
+ assert(dst != nullptr);
+ dst->clear();
+ dst->reserve(BlobLogHeader::kSize);
+ PutFixed32(dst, kMagicNumber);
+ PutFixed32(dst, version);
+ PutFixed32(dst, column_family_id);
+ unsigned char flags = (has_ttl ? 1 : 0);
+ dst->push_back(flags);
+ dst->push_back(compression);
+ PutFixed64(dst, expiration_range.first);
+ PutFixed64(dst, expiration_range.second);
+}
+
+Status BlobLogHeader::DecodeFrom(Slice src) {
+ static const std::string kErrorMessage =
+ "Error while decoding blob log header";
+ if (src.size() != BlobLogHeader::kSize) {
+ return Status::Corruption(kErrorMessage,
+ "Unexpected blob file header size");
+ }
+ uint32_t magic_number;
+ unsigned char flags;
+ if (!GetFixed32(&src, &magic_number) || !GetFixed32(&src, &version) ||
+ !GetFixed32(&src, &column_family_id)) {
+ return Status::Corruption(
+ kErrorMessage,
+ "Error decoding magic number, version and column family id");
+ }
+ if (magic_number != kMagicNumber) {
+ return Status::Corruption(kErrorMessage, "Magic number mismatch");
+ }
+ if (version != kVersion1) {
+ return Status::Corruption(kErrorMessage, "Unknown header version");
+ }
+ flags = src.data()[0];
+ compression = static_cast<CompressionType>(src.data()[1]);
+ has_ttl = (flags & 1) == 1;
+ src.remove_prefix(2);
+ if (!GetFixed64(&src, &expiration_range.first) ||
+ !GetFixed64(&src, &expiration_range.second)) {
+ return Status::Corruption(kErrorMessage, "Error decoding expiration range");
+ }
+ return Status::OK();
+}
+
+void BlobLogFooter::EncodeTo(std::string* dst) {
+ assert(dst != nullptr);
+ dst->clear();
+ dst->reserve(BlobLogFooter::kSize);
+ PutFixed32(dst, kMagicNumber);
+ PutFixed64(dst, blob_count);
+ PutFixed64(dst, expiration_range.first);
+ PutFixed64(dst, expiration_range.second);
+ crc = crc32c::Value(dst->c_str(), dst->size());
+ crc = crc32c::Mask(crc);
+ PutFixed32(dst, crc);
+}
+
+Status BlobLogFooter::DecodeFrom(Slice src) {
+ static const std::string kErrorMessage =
+ "Error while decoding blob log footer";
+ if (src.size() != BlobLogFooter::kSize) {
+ return Status::Corruption(kErrorMessage,
+ "Unexpected blob file footer size");
+ }
+ uint32_t src_crc = 0;
+ src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - sizeof(uint32_t));
+ src_crc = crc32c::Mask(src_crc);
+ uint32_t magic_number = 0;
+ if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) ||
+ !GetFixed64(&src, &expiration_range.first) ||
+ !GetFixed64(&src, &expiration_range.second) || !GetFixed32(&src, &crc)) {
+ return Status::Corruption(kErrorMessage, "Error decoding content");
+ }
+ if (magic_number != kMagicNumber) {
+ return Status::Corruption(kErrorMessage, "Magic number mismatch");
+ }
+ if (src_crc != crc) {
+ return Status::Corruption(kErrorMessage, "CRC mismatch");
+ }
+ return Status::OK();
+}
+
+void BlobLogRecord::EncodeHeaderTo(std::string* dst) {
+ assert(dst != nullptr);
+ dst->clear();
+ dst->reserve(BlobLogRecord::kHeaderSize + key.size() + value.size());
+ PutFixed64(dst, key.size());
+ PutFixed64(dst, value.size());
+ PutFixed64(dst, expiration);
+ header_crc = crc32c::Value(dst->c_str(), dst->size());
+ header_crc = crc32c::Mask(header_crc);
+ PutFixed32(dst, header_crc);
+ blob_crc = crc32c::Value(key.data(), key.size());
+ blob_crc = crc32c::Extend(blob_crc, value.data(), value.size());
+ blob_crc = crc32c::Mask(blob_crc);
+ PutFixed32(dst, blob_crc);
+}
+
+Status BlobLogRecord::DecodeHeaderFrom(Slice src) {
+ static const std::string kErrorMessage = "Error while decoding blob record";
+ if (src.size() != BlobLogRecord::kHeaderSize) {
+ return Status::Corruption(kErrorMessage,
+ "Unexpected blob record header size");
+ }
+ uint32_t src_crc = 0;
+ src_crc = crc32c::Value(src.data(), BlobLogRecord::kHeaderSize - 8);
+ src_crc = crc32c::Mask(src_crc);
+ if (!GetFixed64(&src, &key_size) || !GetFixed64(&src, &value_size) ||
+ !GetFixed64(&src, &expiration) || !GetFixed32(&src, &header_crc) ||
+ !GetFixed32(&src, &blob_crc)) {
+ return Status::Corruption(kErrorMessage, "Error decoding content");
+ }
+ if (src_crc != header_crc) {
+ return Status::Corruption(kErrorMessage, "Header CRC mismatch");
+ }
+ return Status::OK();
+}
+
+Status BlobLogRecord::CheckBlobCRC() const {
+ uint32_t expected_crc = 0;
+ expected_crc = crc32c::Value(key.data(), key.size());
+ expected_crc = crc32c::Extend(expected_crc, value.data(), value.size());
+ expected_crc = crc32c::Mask(expected_crc);
+ if (expected_crc != blob_crc) {
+ return Status::Corruption("Blob CRC mismatch");
+ }
+ return Status::OK();
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_log_format.h b/src/rocksdb/utilities/blob_db/blob_log_format.h
new file mode 100644
index 00000000..fcc042f0
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_format.h
@@ -0,0 +1,125 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Log format information shared by reader and writer.
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <limits>
+#include <memory>
+#include <utility>
+
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/status.h"
+#include "rocksdb/types.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37
+constexpr uint32_t kVersion1 = 1;
+constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max();
+
+using ExpirationRange = std::pair<uint64_t, uint64_t>;
+
+// Format of blob log file header (30 bytes):
+//
+// +--------------+---------+---------+-------+-------------+-------------------+
+// | magic number | version | cf id | flags | compression | expiration range |
+// +--------------+---------+---------+-------+-------------+-------------------+
+// | Fixed32 | Fixed32 | Fixed32 | char | char | Fixed64 Fixed64 |
+// +--------------+---------+---------+-------+-------------+-------------------+
+//
+// List of flags:
+// has_ttl: Whether the file contain TTL data.
+//
+// Expiration range in the header is a rough range based on
+// blob_db_options.ttl_range_secs.
+struct BlobLogHeader {
+ static constexpr size_t kSize = 30;
+
+ uint32_t version = kVersion1;
+ uint32_t column_family_id = 0;
+ CompressionType compression = kNoCompression;
+ bool has_ttl = false;
+ ExpirationRange expiration_range = std::make_pair(0, 0);
+
+ void EncodeTo(std::string* dst);
+
+ Status DecodeFrom(Slice slice);
+};
+
+// Format of blob log file footer (32 bytes):
+//
+// +--------------+------------+-------------------+------------+
+// | magic number | blob count | expiration range | footer CRC |
+// +--------------+------------+-------------------+------------+
+// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed32 |
+// +--------------+------------+-------------------+------------+
+//
+// The footer will be presented only when the blob file is properly closed.
+//
+// Unlike the same field in file header, expiration range in the footer is the
+// range of smallest and largest expiration of the data in this file.
+struct BlobLogFooter {
+ static constexpr size_t kSize = 32;
+
+ uint64_t blob_count = 0;
+ ExpirationRange expiration_range = std::make_pair(0, 0);
+ uint32_t crc = 0;
+
+ void EncodeTo(std::string* dst);
+
+ Status DecodeFrom(Slice slice);
+};
+
+// Blob record format (32 bytes header + key + value):
+//
+// +------------+--------------+------------+------------+----------+---------+-----------+
+// | key length | value length | expiration | header CRC | blob CRC | key | value |
+// +------------+--------------+------------+------------+----------+---------+-----------+
+// | Fixed64 | Fixed64 | Fixed64 | Fixed32 | Fixed32 | key len | value len |
+// +------------+--------------+------------+------------+----------+---------+-----------+
+//
+// If file has has_ttl = false, expiration field is always 0, and the blob
+// doesn't has expiration.
+//
+// Also note that if compression is used, value is compressed value and value
+// length is compressed value length.
+//
+// Header CRC is the checksum of (key_len + val_len + expiration), while
+// blob CRC is the checksum of (key + value).
+//
+// We could use variable length encoding (Varint64) to save more space, but it
+// make reader more complicated.
+struct BlobLogRecord {
+ // header include fields up to blob CRC
+ static constexpr size_t kHeaderSize = 32;
+
+ uint64_t key_size = 0;
+ uint64_t value_size = 0;
+ uint64_t expiration = 0;
+ uint32_t header_crc = 0;
+ uint32_t blob_crc = 0;
+ Slice key;
+ Slice value;
+ std::unique_ptr<char[]> key_buf;
+ std::unique_ptr<char[]> value_buf;
+
+ uint64_t record_size() const { return kHeaderSize + key_size + value_size; }
+
+ void EncodeHeaderTo(std::string* dst);
+
+ Status DecodeHeaderFrom(Slice src);
+
+ Status CheckBlobCRC() const;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_log_reader.cc b/src/rocksdb/utilities/blob_db/blob_log_reader.cc
new file mode 100644
index 00000000..8ffcc2fa
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_reader.cc
@@ -0,0 +1,105 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_log_reader.h"
+
+#include <algorithm>
+
+#include "monitoring/statistics.h"
+#include "util/file_reader_writer.h"
+#include "util/stop_watch.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+Reader::Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
+ Statistics* statistics)
+ : file_(std::move(file_reader)),
+ env_(env),
+ statistics_(statistics),
+ buffer_(),
+ next_byte_(0) {}
+
+Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) {
+ StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS);
+ Status s = file_->Read(next_byte_, static_cast<size_t>(size), slice, buf);
+ next_byte_ += size;
+ if (!s.ok()) {
+ return s;
+ }
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size());
+ if (slice->size() != size) {
+ return Status::Corruption("EOF reached while reading record");
+ }
+ return s;
+}
+
+Status Reader::ReadHeader(BlobLogHeader* header) {
+ assert(file_.get() != nullptr);
+ assert(next_byte_ == 0);
+ Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if (buffer_.size() != BlobLogHeader::kSize) {
+ return Status::Corruption("EOF reached before file header");
+ }
+
+ return header->DecodeFrom(buffer_);
+}
+
+Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level,
+ uint64_t* blob_offset) {
+ Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_);
+ if (!s.ok()) {
+ return s;
+ }
+ if (buffer_.size() != BlobLogRecord::kHeaderSize) {
+ return Status::Corruption("EOF reached before record header");
+ }
+
+ s = record->DecodeHeaderFrom(buffer_);
+ if (!s.ok()) {
+ return s;
+ }
+
+ uint64_t kb_size = record->key_size + record->value_size;
+ if (blob_offset != nullptr) {
+ *blob_offset = next_byte_ + record->key_size;
+ }
+
+ switch (level) {
+ case kReadHeader:
+ next_byte_ += kb_size;
+ break;
+
+ case kReadHeaderKey:
+ record->key_buf.reset(new char[record->key_size]);
+ s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
+ next_byte_ += record->value_size;
+ break;
+
+ case kReadHeaderKeyBlob:
+ record->key_buf.reset(new char[record->key_size]);
+ s = ReadSlice(record->key_size, &record->key, record->key_buf.get());
+ if (s.ok()) {
+ record->value_buf.reset(new char[record->value_size]);
+ s = ReadSlice(record->value_size, &record->value,
+ record->value_buf.get());
+ }
+ if (s.ok()) {
+ s = record->CheckBlobCRC();
+ }
+ break;
+ }
+ return s;
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_log_reader.h b/src/rocksdb/utilities/blob_db/blob_log_reader.h
new file mode 100644
index 00000000..45e2e955
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_reader.h
@@ -0,0 +1,83 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <memory>
+#include <string>
+
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/status.h"
+#include "util/file_reader_writer.h"
+#include "utilities/blob_db/blob_log_format.h"
+
+namespace rocksdb {
+
+class SequentialFileReader;
+class Logger;
+
+namespace blob_db {
+
+/**
+ * Reader is a general purpose log stream reader implementation. The actual job
+ * of reading from the device is implemented by the SequentialFile interface.
+ *
+ * Please see Writer for details on the file and record layout.
+ */
+class Reader {
+ public:
+ enum ReadLevel {
+ kReadHeader,
+ kReadHeaderKey,
+ kReadHeaderKeyBlob,
+ };
+
+ // Create a reader that will return log records from "*file".
+ // "*file" must remain live while this Reader is in use.
+ Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env,
+ Statistics* statistics);
+
+ ~Reader() = default;
+
+ // No copying allowed
+ Reader(const Reader&) = delete;
+ Reader& operator=(const Reader&) = delete;
+
+ Status ReadHeader(BlobLogHeader* header);
+
+ // Read the next record into *record. Returns true if read
+ // successfully, false if we hit end of the input. May use
+ // "*scratch" as temporary storage. The contents filled in *record
+ // will only be valid until the next mutating operation on this
+ // reader or the next mutation to *scratch.
+ // If blob_offset is non-null, return offset of the blob through it.
+ Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader,
+ uint64_t* blob_offset = nullptr);
+
+ void ResetNextByte() { next_byte_ = 0; }
+
+ uint64_t GetNextByte() const { return next_byte_; }
+
+ private:
+ Status ReadSlice(uint64_t size, Slice* slice, char* buf);
+
+ const std::unique_ptr<RandomAccessFileReader> file_;
+ Env* env_;
+ Statistics* statistics_;
+
+ Slice buffer_;
+ char header_buf_[BlobLogRecord::kHeaderSize];
+
+ // which byte to read next. For asserting proper usage
+ uint64_t next_byte_;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_log_writer.cc b/src/rocksdb/utilities/blob_db/blob_log_writer.cc
new file mode 100644
index 00000000..51578c5c
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_writer.cc
@@ -0,0 +1,139 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_log_writer.h"
+
+#include <cstdint>
+#include <string>
+
+#include "monitoring/statistics.h"
+#include "rocksdb/env.h"
+#include "util/coding.h"
+#include "util/file_reader_writer.h"
+#include "util/stop_watch.h"
+#include "utilities/blob_db/blob_log_format.h"
+
+namespace rocksdb {
+namespace blob_db {
+
+Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
+ Statistics* statistics, uint64_t log_number, uint64_t bpsync,
+ bool use_fs, uint64_t boffset)
+ : dest_(std::move(dest)),
+ env_(env),
+ statistics_(statistics),
+ log_number_(log_number),
+ block_offset_(boffset),
+ bytes_per_sync_(bpsync),
+ next_sync_offset_(0),
+ use_fsync_(use_fs),
+ last_elem_type_(kEtNone) {}
+
+Status Writer::Sync() {
+ StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
+ Status s = dest_->Sync(use_fsync_);
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
+ return s;
+}
+
+Status Writer::WriteHeader(BlobLogHeader& header) {
+ assert(block_offset_ == 0);
+ assert(last_elem_type_ == kEtNone);
+ std::string str;
+ header.EncodeTo(&str);
+
+ Status s = dest_->Append(Slice(str));
+ if (s.ok()) {
+ block_offset_ += str.size();
+ s = dest_->Flush();
+ }
+ last_elem_type_ = kEtFileHdr;
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
+ BlobLogHeader::kSize);
+ return s;
+}
+
+Status Writer::AppendFooter(BlobLogFooter& footer) {
+ assert(block_offset_ != 0);
+ assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
+
+ std::string str;
+ footer.EncodeTo(&str);
+
+ Status s = dest_->Append(Slice(str));
+ if (s.ok()) {
+ block_offset_ += str.size();
+ s = dest_->Close();
+ dest_.reset();
+ }
+
+ last_elem_type_ = kEtFileFooter;
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
+ BlobLogFooter::kSize);
+ return s;
+}
+
+Status Writer::AddRecord(const Slice& key, const Slice& val,
+ uint64_t expiration, uint64_t* key_offset,
+ uint64_t* blob_offset) {
+ assert(block_offset_ != 0);
+ assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
+
+ std::string buf;
+ ConstructBlobHeader(&buf, key, val, expiration);
+
+ Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
+ return s;
+}
+
+Status Writer::AddRecord(const Slice& key, const Slice& val,
+ uint64_t* key_offset, uint64_t* blob_offset) {
+ assert(block_offset_ != 0);
+ assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
+
+ std::string buf;
+ ConstructBlobHeader(&buf, key, val, 0);
+
+ Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset);
+ return s;
+}
+
+void Writer::ConstructBlobHeader(std::string* buf, const Slice& key,
+ const Slice& val, uint64_t expiration) {
+ BlobLogRecord record;
+ record.key = key;
+ record.value = val;
+ record.expiration = expiration;
+ record.EncodeHeaderTo(buf);
+}
+
+Status Writer::EmitPhysicalRecord(const std::string& headerbuf,
+ const Slice& key, const Slice& val,
+ uint64_t* key_offset, uint64_t* blob_offset) {
+ StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
+ Status s = dest_->Append(Slice(headerbuf));
+ if (s.ok()) {
+ s = dest_->Append(key);
+ }
+ if (s.ok()) {
+ s = dest_->Append(val);
+ }
+ if (s.ok()) {
+ s = dest_->Flush();
+ }
+
+ *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
+ *blob_offset = *key_offset + key.size();
+ block_offset_ = *blob_offset + val.size();
+ last_elem_type_ = kEtRecord;
+ RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
+ BlobLogRecord::kHeaderSize + key.size() + val.size());
+ return s;
+}
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/blob_db/blob_log_writer.h b/src/rocksdb/utilities/blob_db/blob_log_writer.h
new file mode 100644
index 00000000..dccac355
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_log_writer.h
@@ -0,0 +1,95 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "rocksdb/env.h"
+#include "rocksdb/slice.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/status.h"
+#include "rocksdb/types.h"
+#include "utilities/blob_db/blob_log_format.h"
+
+namespace rocksdb {
+
+class WritableFileWriter;
+
+namespace blob_db {
+
+/**
+ * Writer is the blob log stream writer. It provides an append-only
+ * abstraction for writing blob data.
+ *
+ *
+ * Look at blob_db_format.h to see the details of the record formats.
+ */
+
+class Writer {
+ public:
+ // Create a writer that will append data to "*dest".
+ // "*dest" must be initially empty.
+ // "*dest" must remain live while this Writer is in use.
+ Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env,
+ Statistics* statistics, uint64_t log_number, uint64_t bpsync,
+ bool use_fsync, uint64_t boffset = 0);
+
+ ~Writer() = default;
+
+ // No copying allowed
+ Writer(const Writer&) = delete;
+ Writer& operator=(const Writer&) = delete;
+
+ static void ConstructBlobHeader(std::string* buf, const Slice& key,
+ const Slice& val, uint64_t expiration);
+
+ Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset,
+ uint64_t* blob_offset);
+
+ Status AddRecord(const Slice& key, const Slice& val, uint64_t expiration,
+ uint64_t* key_offset, uint64_t* blob_offset);
+
+ Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key,
+ const Slice& val, uint64_t* key_offset,
+ uint64_t* blob_offset);
+
+ Status AppendFooter(BlobLogFooter& footer);
+
+ Status WriteHeader(BlobLogHeader& header);
+
+ WritableFileWriter* file() { return dest_.get(); }
+
+ const WritableFileWriter* file() const { return dest_.get(); }
+
+ uint64_t get_log_number() const { return log_number_; }
+
+ bool ShouldSync() const { return block_offset_ > next_sync_offset_; }
+
+ Status Sync();
+
+ void ResetSyncPointer() { next_sync_offset_ += bytes_per_sync_; }
+
+ private:
+ std::unique_ptr<WritableFileWriter> dest_;
+ Env* env_;
+ Statistics* statistics_;
+ uint64_t log_number_;
+ uint64_t block_offset_; // Current offset in block
+ uint64_t bytes_per_sync_;
+ uint64_t next_sync_offset_;
+ bool use_fsync_;
+
+ public:
+ enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter };
+ ElemType last_elem_type_;
+};
+
+} // namespace blob_db
+} // namespace rocksdb
+#endif // ROCKSDB_LITE