diff options
Diffstat (limited to '')
21 files changed, 6578 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc new file mode 100644 index 00000000..f145d9a9 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc @@ -0,0 +1,114 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_compaction_filter.h" +#include "db/dbformat.h" + +namespace rocksdb { +namespace blob_db { + +namespace { + +// CompactionFilter to delete expired blob index from base DB. +class BlobIndexCompactionFilter : public CompactionFilter { + public: + BlobIndexCompactionFilter(BlobCompactionContext context, + uint64_t current_time, Statistics* statistics) + : context_(context), + current_time_(current_time), + statistics_(statistics) {} + + ~BlobIndexCompactionFilter() override { + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); + } + + const char* Name() const override { return "BlobIndexCompactionFilter"; } + + // Filter expired blob indexes regardless of snapshots. + bool IgnoreSnapshots() const override { return true; } + + Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type, + const Slice& value, std::string* /*new_value*/, + std::string* /*skip_until*/) const override { + if (value_type != kBlobIndex) { + return Decision::kKeep; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + expired_count_++; + expired_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (!blob_index.IsInlined() && + blob_index.file_number() < context_.next_file_number && + context_.current_blob_files.count(blob_index.file_number()) == 0) { + // Corresponding blob file gone. Could have been garbage collected or + // evicted by FIFO eviction. + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && + blob_index.expiration() < context_.evict_expiration_up_to) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + bool ok = ParseInternalKey(key, &ikey); + // Remove keys that could have been remove by last FIFO eviction. + // If get error while parsing key, ignore and continue. + if (ok && ikey.sequence < context_.fifo_eviction_seq) { + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + } + return Decision::kKeep; + } + + private: + BlobCompactionContext context_; + const uint64_t current_time_; + Statistics* statistics_; + // It is safe to not using std::atomic since the compaction filter, created + // from a compaction filter factroy, will not be called from multiple threads. + mutable uint64_t expired_count_ = 0; + mutable uint64_t expired_size_ = 0; + mutable uint64_t evicted_count_ = 0; + mutable uint64_t evicted_size_ = 0; +}; + +} // anonymous namespace + +std::unique_ptr<CompactionFilter> +BlobIndexCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) { + int64_t current_time = 0; + Status s = env_->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + BlobCompactionContext context; + blob_db_impl_->GetCompactionContext(&context); + + return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter( + context, static_cast<uint64_t>(current_time), statistics_)); +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.h b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h new file mode 100644 index 00000000..7a8ea613 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h @@ -0,0 +1,47 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include <unordered_set> + +#include "monitoring/statistics.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "utilities/blob_db/blob_db_impl.h" +#include "utilities/blob_db/blob_index.h" + +namespace rocksdb { +namespace blob_db { + +struct BlobCompactionContext { + uint64_t next_file_number; + std::unordered_set<uint64_t> current_blob_files; + SequenceNumber fifo_eviction_seq; + uint64_t evict_expiration_up_to; +}; + +class BlobIndexCompactionFilterFactory : public CompactionFilterFactory { + public: + BlobIndexCompactionFilterFactory(BlobDBImpl* blob_db_impl, Env* env, + Statistics* statistics) + : blob_db_impl_(blob_db_impl), env_(env), statistics_(statistics) {} + + virtual const char* Name() const override { + return "BlobIndexCompactionFilterFactory"; + } + + virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context& /*context*/) override; + + private: + BlobDBImpl* blob_db_impl_; + Env* env_; + Statistics* statistics_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db.cc b/src/rocksdb/utilities/blob_db/blob_db.cc new file mode 100644 index 00000000..d660def4 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db.cc @@ -0,0 +1,103 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "utilities/blob_db/blob_db.h" + +#include <inttypes.h> +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +Status BlobDB::Open(const Options& options, const BlobDBOptions& bdb_options, + const std::string& dbname, BlobDB** blob_db) { + *blob_db = nullptr; + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector<ColumnFamilyDescriptor> column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + std::vector<ColumnFamilyHandle*> handles; + Status s = BlobDB::Open(db_options, bdb_options, dbname, column_families, + &handles, blob_db); + if (s.ok()) { + assert(handles.size() == 1); + // i can delete the handle since DBImpl is always holding a reference to + // default column family + delete handles[0]; + } + return s; +} + +Status BlobDB::Open(const DBOptions& db_options, + const BlobDBOptions& bdb_options, const std::string& dbname, + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles, + BlobDB** blob_db) { + if (column_families.size() != 1 || + column_families[0].name != kDefaultColumnFamilyName) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + + BlobDBImpl* blob_db_impl = new BlobDBImpl(dbname, bdb_options, db_options, + column_families[0].options); + Status s = blob_db_impl->Open(handles); + if (s.ok()) { + *blob_db = static_cast<BlobDB*>(blob_db_impl); + } else { + delete blob_db_impl; + *blob_db = nullptr; + } + return s; +} + +BlobDB::BlobDB() : StackableDB(nullptr) {} + +void BlobDBOptions::Dump(Logger* log) const { + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_dir: %s", + blob_dir.c_str()); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.path_relative: %d", + path_relative); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.is_fifo: %d", + is_fifo); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.max_db_size: %" PRIu64, + max_db_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.ttl_range_secs: %" PRIu64, + ttl_range_secs); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.min_blob_size: %" PRIu64, + min_blob_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.bytes_per_sync: %" PRIu64, + bytes_per_sync); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.blob_file_size: %" PRIu64, + blob_file_size); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.compression: %d", + static_cast<int>(compression)); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.enable_garbage_collection: %d", + enable_garbage_collection); + ROCKS_LOG_HEADER( + log, " BlobDBOptions.disable_background_tasks: %d", + disable_background_tasks); +} + +} // namespace blob_db +} // namespace rocksdb +#endif diff --git a/src/rocksdb/utilities/blob_db/blob_db.h b/src/rocksdb/utilities/blob_db/blob_db.h new file mode 100644 index 00000000..3beb74fc --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db.h @@ -0,0 +1,227 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include <functional> +#include <string> +#include <vector> +#include "rocksdb/db.h" +#include "rocksdb/status.h" +#include "rocksdb/utilities/stackable_db.h" + +namespace rocksdb { + +namespace blob_db { + +// A wrapped database which puts values of KV pairs in a separate log +// and store location to the log in the underlying DB. +// It lacks lots of importatant functionalities, e.g. DB restarts, +// garbage collection, iterators, etc. +// +// The factory needs to be moved to include/rocksdb/utilities to allow +// users to use blob DB. + +struct BlobDBOptions { + // name of the directory under main db, where blobs will be stored. + // default is "blob_dir" + std::string blob_dir = "blob_dir"; + + // whether the blob_dir path is relative or absolute. + bool path_relative = true; + + // When max_db_size is reached, evict blob files to free up space + // instead of returnning NoSpace error on write. Blob files will be + // evicted from oldest to newest, based on file creation time. + bool is_fifo = false; + + // Maximum size of the database (including SST files and blob files). + // + // Default: 0 (no limits) + uint64_t max_db_size = 0; + + // a new bucket is opened, for ttl_range. So if ttl_range is 600seconds + // (10 minutes), and the first bucket starts at 1471542000 + // then the blob buckets will be + // first bucket is 1471542000 - 1471542600 + // second bucket is 1471542600 - 1471543200 + // and so on + uint64_t ttl_range_secs = 3600; + + // The smallest value to store in blob log. Values smaller than this threshold + // will be inlined in base DB together with the key. + uint64_t min_blob_size = 0; + + // Allows OS to incrementally sync blob files to disk for every + // bytes_per_sync bytes written. Users shouldn't rely on it for + // persistency guarantee. + uint64_t bytes_per_sync = 512 * 1024; + + // the target size of each blob file. File will become immutable + // after it exceeds that size + uint64_t blob_file_size = 256 * 1024 * 1024; + + // what compression to use for Blob's + CompressionType compression = kNoCompression; + + // If enabled, blob DB periodically cleanup stale data by rewriting remaining + // live data in blob files to new files. If garbage collection is not enabled, + // blob files will be cleanup based on TTL. + bool enable_garbage_collection = false; + + // Disable all background job. Used for test only. + bool disable_background_tasks = false; + + void Dump(Logger* log) const; +}; + +class BlobDB : public StackableDB { + public: + using rocksdb::StackableDB::Put; + virtual Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override = 0; + virtual Status Put(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) override { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return Put(options, key, value); + } + + using rocksdb::StackableDB::Delete; + virtual Status Delete(const WriteOptions& options, + ColumnFamilyHandle* column_family, + const Slice& key) override { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + assert(db_ != nullptr); + return db_->Delete(options, column_family, key); + } + + virtual Status PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) = 0; + virtual Status PutWithTTL(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, uint64_t ttl) { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return PutWithTTL(options, key, value, ttl); + } + + // Put with expiration. Key with expiration time equal to + // std::numeric_limits<uint64_t>::max() means the key don't expire. + virtual Status PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) = 0; + virtual Status PutUntil(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value, uint64_t expiration) { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + return PutUntil(options, key, value, expiration); + } + + using rocksdb::StackableDB::Get; + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) override = 0; + + // Get value and expiration. + virtual Status Get(const ReadOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration) = 0; + virtual Status Get(const ReadOptions& options, const Slice& key, + PinnableSlice* value, uint64_t* expiration) { + return Get(options, DefaultColumnFamily(), key, value, expiration); + } + + using rocksdb::StackableDB::MultiGet; + virtual std::vector<Status> MultiGet( + const ReadOptions& options, + const std::vector<Slice>& keys, + std::vector<std::string>* values) override = 0; + virtual std::vector<Status> MultiGet( + const ReadOptions& options, + const std::vector<ColumnFamilyHandle*>& column_families, + const std::vector<Slice>& keys, + std::vector<std::string>* values) override { + for (auto column_family : column_families) { + if (column_family != DefaultColumnFamily()) { + return std::vector<Status>( + column_families.size(), + Status::NotSupported( + "Blob DB doesn't support non-default column family.")); + } + } + return MultiGet(options, keys, values); + } + + using rocksdb::StackableDB::SingleDelete; + virtual Status SingleDelete(const WriteOptions& /*wopts*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + using rocksdb::StackableDB::Merge; + virtual Status Merge(const WriteOptions& /*options*/, + ColumnFamilyHandle* /*column_family*/, + const Slice& /*key*/, const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + virtual Status Write(const WriteOptions& opts, + WriteBatch* updates) override = 0; + using rocksdb::StackableDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options) override = 0; + virtual Iterator* NewIterator(const ReadOptions& options, + ColumnFamilyHandle* column_family) override { + if (column_family != DefaultColumnFamily()) { + // Blob DB doesn't support non-default column family. + return nullptr; + } + return NewIterator(options); + } + + using rocksdb::StackableDB::Close; + virtual Status Close() override = 0; + + // Opening blob db. + static Status Open(const Options& options, const BlobDBOptions& bdb_options, + const std::string& dbname, BlobDB** blob_db); + + static Status Open(const DBOptions& db_options, + const BlobDBOptions& bdb_options, + const std::string& dbname, + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles, + BlobDB** blob_db); + + virtual BlobDBOptions GetBlobDBOptions() const = 0; + + virtual Status SyncBlobFiles() = 0; + + virtual ~BlobDB() {} + + protected: + explicit BlobDB(); +}; + +// Destroy the content of the database. +Status DestroyBlobDB(const std::string& dbname, const Options& options, + const BlobDBOptions& bdb_options); + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.cc b/src/rocksdb/utilities/blob_db/blob_db_impl.cc new file mode 100644 index 00000000..5dcddc21 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl.cc @@ -0,0 +1,1922 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db_impl.h" +#include <algorithm> +#include <cinttypes> +#include <iomanip> +#include <memory> + +#include "db/db_impl.h" +#include "db/write_batch_internal.h" +#include "monitoring/instrumented_mutex.h" +#include "monitoring/statistics.h" +#include "rocksdb/convenience.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/utilities/stackable_db.h" +#include "rocksdb/utilities/transaction.h" +#include "table/block.h" +#include "table/block_based_table_builder.h" +#include "table/block_builder.h" +#include "table/meta_blocks.h" +#include "util/cast_util.h" +#include "util/crc32c.h" +#include "util/file_reader_writer.h" +#include "util/file_util.h" +#include "util/filename.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/random.h" +#include "util/sst_file_manager_impl.h" +#include "util/stop_watch.h" +#include "util/sync_point.h" +#include "util/timer_queue.h" +#include "utilities/blob_db/blob_compaction_filter.h" +#include "utilities/blob_db/blob_db_iterator.h" +#include "utilities/blob_db/blob_db_listener.h" +#include "utilities/blob_db/blob_index.h" + +namespace { +int kBlockBasedTableVersionFormat = 2; +} // end namespace + +namespace rocksdb { +namespace blob_db { + +bool BlobFileComparator::operator()( + const std::shared_ptr<BlobFile>& lhs, + const std::shared_ptr<BlobFile>& rhs) const { + return lhs->BlobFileNumber() > rhs->BlobFileNumber(); +} + +bool BlobFileComparatorTTL::operator()( + const std::shared_ptr<BlobFile>& lhs, + const std::shared_ptr<BlobFile>& rhs) const { + assert(lhs->HasTTL() && rhs->HasTTL()); + if (lhs->expiration_range_.first < rhs->expiration_range_.first) { + return true; + } + if (lhs->expiration_range_.first > rhs->expiration_range_.first) { + return false; + } + return lhs->BlobFileNumber() < rhs->BlobFileNumber(); +} + +BlobDBImpl::BlobDBImpl(const std::string& dbname, + const BlobDBOptions& blob_db_options, + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options) + : BlobDB(), + dbname_(dbname), + db_impl_(nullptr), + env_(db_options.env), + bdb_options_(blob_db_options), + db_options_(db_options), + cf_options_(cf_options), + env_options_(db_options), + statistics_(db_options_.statistics.get()), + next_file_number_(1), + epoch_of_(0), + closed_(true), + open_file_count_(0), + total_blob_size_(0), + live_sst_size_(0), + fifo_eviction_seq_(0), + evict_expiration_up_to_(0), + debug_level_(0) { + blob_dir_ = (bdb_options_.path_relative) + ? dbname + "/" + bdb_options_.blob_dir + : bdb_options_.blob_dir; + env_options_.bytes_per_sync = blob_db_options.bytes_per_sync; +} + +BlobDBImpl::~BlobDBImpl() { + tqueue_.shutdown(); + // CancelAllBackgroundWork(db_, true); + Status s __attribute__((__unused__)) = Close(); + assert(s.ok()); +} + +Status BlobDBImpl::Close() { + if (closed_) { + return Status::OK(); + } + closed_ = true; + + // Close base DB before BlobDBImpl destructs to stop event listener and + // compaction filter call. + Status s = db_->Close(); + // delete db_ anyway even if close failed. + delete db_; + // Reset pointers to avoid StackableDB delete the pointer again. + db_ = nullptr; + db_impl_ = nullptr; + if (!s.ok()) { + return s; + } + + s = SyncBlobFiles(); + return s; +} + +BlobDBOptions BlobDBImpl::GetBlobDBOptions() const { return bdb_options_; } + +Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) { + assert(handles != nullptr); + assert(db_ == nullptr); + if (blob_dir_.empty()) { + return Status::NotSupported("No blob directory in options"); + } + if (cf_options_.compaction_filter != nullptr || + cf_options_.compaction_filter_factory != nullptr) { + return Status::NotSupported("Blob DB doesn't support compaction filter."); + } + + Status s; + + // Create info log. + if (db_options_.info_log == nullptr) { + s = CreateLoggerFromOptions(dbname_, db_options_, &db_options_.info_log); + if (!s.ok()) { + return s; + } + } + + ROCKS_LOG_INFO(db_options_.info_log, "Opening BlobDB..."); + + // Open blob directory. + s = env_->CreateDirIfMissing(blob_dir_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to create blob_dir %s, status: %s", + blob_dir_.c_str(), s.ToString().c_str()); + } + s = env_->NewDirectory(blob_dir_, &dir_ent_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open blob_dir %s, status: %s", blob_dir_.c_str(), + s.ToString().c_str()); + return s; + } + + // Open blob files. + s = OpenAllBlobFiles(); + if (!s.ok()) { + return s; + } + + // Update options + db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this)); + cf_options_.compaction_filter_factory.reset( + new BlobIndexCompactionFilterFactory(this, env_, statistics_)); + + // Open base db. + ColumnFamilyDescriptor cf_descriptor(kDefaultColumnFamilyName, cf_options_); + s = DB::Open(db_options_, dbname_, {cf_descriptor}, handles, &db_); + if (!s.ok()) { + return s; + } + db_impl_ = static_cast_with_check<DBImpl, DB>(db_->GetRootDB()); + + // Add trash files in blob dir to file delete scheduler. + SstFileManagerImpl* sfm = static_cast<SstFileManagerImpl*>( + db_impl_->immutable_db_options().sst_file_manager.get()); + DeleteScheduler::CleanupDirectory(env_, sfm, blob_dir_); + + UpdateLiveSSTSize(); + + // Start background jobs. + if (!bdb_options_.disable_background_tasks) { + StartBackgroundTasks(); + } + + ROCKS_LOG_INFO(db_options_.info_log, "BlobDB pointer %p", this); + bdb_options_.Dump(db_options_.info_log.get()); + closed_ = false; + return s; +} + +void BlobDBImpl::StartBackgroundTasks() { + // store a call to a member function and object + tqueue_.add( + kReclaimOpenFilesPeriodMillisecs, + std::bind(&BlobDBImpl::ReclaimOpenFiles, this, std::placeholders::_1)); + tqueue_.add( + kDeleteObsoleteFilesPeriodMillisecs, + std::bind(&BlobDBImpl::DeleteObsoleteFiles, this, std::placeholders::_1)); + tqueue_.add(kSanityCheckPeriodMillisecs, + std::bind(&BlobDBImpl::SanityCheck, this, std::placeholders::_1)); + tqueue_.add( + kEvictExpiredFilesPeriodMillisecs, + std::bind(&BlobDBImpl::EvictExpiredFiles, this, std::placeholders::_1)); +} + +Status BlobDBImpl::GetAllBlobFiles(std::set<uint64_t>* file_numbers) { + assert(file_numbers != nullptr); + std::vector<std::string> all_files; + Status s = env_->GetChildren(blob_dir_, &all_files); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to get list of blob files, status: %s", + s.ToString().c_str()); + return s; + } + + for (const auto& file_name : all_files) { + uint64_t file_number; + FileType type; + bool success = ParseFileName(file_name, &file_number, &type); + if (success && type == kBlobFile) { + file_numbers->insert(file_number); + } else { + ROCKS_LOG_WARN(db_options_.info_log, + "Skipping file in blob directory: %s", file_name.c_str()); + } + } + + return s; +} + +Status BlobDBImpl::OpenAllBlobFiles() { + std::set<uint64_t> file_numbers; + Status s = GetAllBlobFiles(&file_numbers); + if (!s.ok()) { + return s; + } + + if (!file_numbers.empty()) { + next_file_number_.store(*file_numbers.rbegin() + 1); + } + + std::string blob_file_list; + std::string obsolete_file_list; + + for (auto& file_number : file_numbers) { + std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>( + this, blob_dir_, file_number, db_options_.info_log.get()); + blob_file->MarkImmutable(); + + // Read file header and footer + Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_); + if (read_metadata_status.IsCorruption()) { + // Remove incomplete file. + ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/); + if (!obsolete_file_list.empty()) { + obsolete_file_list.append(", "); + } + obsolete_file_list.append(ToString(file_number)); + continue; + } else if (!read_metadata_status.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Unable to read metadata of blob file %" PRIu64 + ", status: '%s'", + file_number, read_metadata_status.ToString().c_str()); + return read_metadata_status; + } + + total_blob_size_ += blob_file->GetFileSize(); + + blob_files_[file_number] = blob_file; + if (!blob_file_list.empty()) { + blob_file_list.append(", "); + } + blob_file_list.append(ToString(file_number)); + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(), + blob_file_list.c_str()); + ROCKS_LOG_INFO(db_options_.info_log, + "Found %" ROCKSDB_PRIszt + " incomplete or corrupted blob files: %s", + obsolete_files_.size(), obsolete_file_list.c_str()); + return s; +} + +void BlobDBImpl::CloseRandomAccessLocked( + const std::shared_ptr<BlobFile>& bfile) { + bfile->CloseRandomAccessLocked(); + open_file_count_--; +} + +Status BlobDBImpl::GetBlobFileReader( + const std::shared_ptr<BlobFile>& blob_file, + std::shared_ptr<RandomAccessFileReader>* reader) { + assert(reader != nullptr); + bool fresh_open = false; + Status s = blob_file->GetReader(env_, env_options_, reader, &fresh_open); + if (s.ok() && fresh_open) { + assert(*reader != nullptr); + open_file_count_++; + } + return s; +} + +std::shared_ptr<BlobFile> BlobDBImpl::NewBlobFile(const std::string& reason) { + uint64_t file_num = next_file_number_++; + auto bfile = std::make_shared<BlobFile>(this, blob_dir_, file_num, + db_options_.info_log.get()); + ROCKS_LOG_DEBUG(db_options_.info_log, "New blob file created: %s reason='%s'", + bfile->PathName().c_str(), reason.c_str()); + LogFlush(db_options_.info_log); + return bfile; +} + +Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) { + std::string fpath(bfile->PathName()); + std::unique_ptr<WritableFile> wfile; + + Status s = env_->ReopenWritableFile(fpath, &wfile, env_options_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open blob file for write: %s status: '%s'" + " exists: '%s'", + fpath.c_str(), s.ToString().c_str(), + env_->FileExists(fpath).ToString().c_str()); + return s; + } + + std::unique_ptr<WritableFileWriter> fwriter; + fwriter.reset(new WritableFileWriter(std::move(wfile), fpath, env_options_)); + + uint64_t boffset = bfile->GetFileSize(); + if (debug_level_ >= 2 && boffset) { + ROCKS_LOG_DEBUG(db_options_.info_log, + "Open blob file: %s with offset: %" PRIu64, fpath.c_str(), + boffset); + } + + Writer::ElemType et = Writer::kEtNone; + if (bfile->file_size_ == BlobLogHeader::kSize) { + et = Writer::kEtFileHdr; + } else if (bfile->file_size_ > BlobLogHeader::kSize) { + et = Writer::kEtRecord; + } else if (bfile->file_size_) { + ROCKS_LOG_WARN(db_options_.info_log, + "Open blob file: %s with wrong size: %" PRIu64, + fpath.c_str(), boffset); + return Status::Corruption("Invalid blob file size"); + } + + bfile->log_writer_ = std::make_shared<Writer>( + std::move(fwriter), env_, statistics_, bfile->file_number_, + bdb_options_.bytes_per_sync, db_options_.use_fsync, boffset); + bfile->log_writer_->last_elem_type_ = et; + + return s; +} + +std::shared_ptr<BlobFile> BlobDBImpl::FindBlobFileLocked( + uint64_t expiration) const { + if (open_ttl_files_.empty()) { + return nullptr; + } + + std::shared_ptr<BlobFile> tmp = std::make_shared<BlobFile>(); + tmp->SetHasTTL(true); + tmp->expiration_range_ = std::make_pair(expiration, 0); + tmp->file_number_ = std::numeric_limits<uint64_t>::max(); + + auto citr = open_ttl_files_.equal_range(tmp); + if (citr.first == open_ttl_files_.end()) { + assert(citr.second == open_ttl_files_.end()); + + std::shared_ptr<BlobFile> check = *(open_ttl_files_.rbegin()); + return (check->expiration_range_.second <= expiration) ? nullptr : check; + } + + if (citr.first != citr.second) { + return *(citr.first); + } + + auto finditr = citr.second; + if (finditr != open_ttl_files_.begin()) { + --finditr; + } + + bool b2 = (*finditr)->expiration_range_.second <= expiration; + bool b1 = (*finditr)->expiration_range_.first > expiration; + + return (b1 || b2) ? nullptr : (*finditr); +} + +Status BlobDBImpl::CheckOrCreateWriterLocked( + const std::shared_ptr<BlobFile>& blob_file, + std::shared_ptr<Writer>* writer) { + assert(writer != nullptr); + *writer = blob_file->GetWriter(); + if (*writer != nullptr) { + return Status::OK(); + } + Status s = CreateWriterLocked(blob_file); + if (s.ok()) { + *writer = blob_file->GetWriter(); + } + return s; +} + +Status BlobDBImpl::SelectBlobFile(std::shared_ptr<BlobFile>* blob_file) { + assert(blob_file != nullptr); + { + ReadLock rl(&mutex_); + if (open_non_ttl_file_ != nullptr) { + *blob_file = open_non_ttl_file_; + return Status::OK(); + } + } + + // CHECK again + WriteLock wl(&mutex_); + if (open_non_ttl_file_ != nullptr) { + *blob_file = open_non_ttl_file_; + return Status::OK(); + } + + *blob_file = NewBlobFile("SelectBlobFile"); + assert(*blob_file != nullptr); + + // file not visible, hence no lock + std::shared_ptr<Writer> writer; + Status s = CheckOrCreateWriterLocked(*blob_file, &writer); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to get writer from blob file: %s, error: %s", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; + } + + (*blob_file)->file_size_ = BlobLogHeader::kSize; + (*blob_file)->header_.compression = bdb_options_.compression; + (*blob_file)->header_.has_ttl = false; + (*blob_file)->header_.column_family_id = + reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID(); + (*blob_file)->header_valid_ = true; + (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id); + (*blob_file)->SetHasTTL(false); + (*blob_file)->SetCompression(bdb_options_.compression); + + s = writer->WriteHeader((*blob_file)->header_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to write header to new blob file: %s" + " status: '%s'", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; + } + + blob_files_.insert( + std::make_pair((*blob_file)->BlobFileNumber(), *blob_file)); + open_non_ttl_file_ = *blob_file; + total_blob_size_ += BlobLogHeader::kSize; + return s; +} + +Status BlobDBImpl::SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr<BlobFile>* blob_file) { + assert(blob_file != nullptr); + assert(expiration != kNoExpiration); + uint64_t epoch_read = 0; + { + ReadLock rl(&mutex_); + *blob_file = FindBlobFileLocked(expiration); + epoch_read = epoch_of_.load(); + } + + if (*blob_file != nullptr) { + assert(!(*blob_file)->Immutable()); + return Status::OK(); + } + + uint64_t exp_low = + (expiration / bdb_options_.ttl_range_secs) * bdb_options_.ttl_range_secs; + uint64_t exp_high = exp_low + bdb_options_.ttl_range_secs; + ExpirationRange expiration_range = std::make_pair(exp_low, exp_high); + + *blob_file = NewBlobFile("SelectBlobFileTTL"); + assert(*blob_file != nullptr); + + ROCKS_LOG_INFO(db_options_.info_log, + "New blob file TTL range: %s %" PRIu64 " %" PRIu64, + (*blob_file)->PathName().c_str(), exp_low, exp_high); + LogFlush(db_options_.info_log); + + // we don't need to take lock as no other thread is seeing bfile yet + std::shared_ptr<Writer> writer; + Status s = CheckOrCreateWriterLocked(*blob_file, &writer); + if (!s.ok()) { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to get writer from blob file with TTL: %s, error: %s", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; + } + + (*blob_file)->header_.expiration_range = expiration_range; + (*blob_file)->header_.compression = bdb_options_.compression; + (*blob_file)->header_.has_ttl = true; + (*blob_file)->header_.column_family_id = + reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID(); + (*blob_file)->header_valid_ = true; + (*blob_file)->SetColumnFamilyId((*blob_file)->header_.column_family_id); + (*blob_file)->SetHasTTL(true); + (*blob_file)->SetCompression(bdb_options_.compression); + (*blob_file)->file_size_ = BlobLogHeader::kSize; + + // set the first value of the range, since that is + // concrete at this time. also necessary to add to open_ttl_files_ + (*blob_file)->expiration_range_ = expiration_range; + + WriteLock wl(&mutex_); + // in case the epoch has shifted in the interim, then check + // check condition again - should be rare. + if (epoch_of_.load() != epoch_read) { + std::shared_ptr<BlobFile> blob_file2 = FindBlobFileLocked(expiration); + if (blob_file2 != nullptr) { + *blob_file = std::move(blob_file2); + return Status::OK(); + } + } + + s = writer->WriteHeader((*blob_file)->header_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to write header to new blob file: %s" + " status: '%s'", + (*blob_file)->PathName().c_str(), s.ToString().c_str()); + return s; + } + + blob_files_.insert( + std::make_pair((*blob_file)->BlobFileNumber(), *blob_file)); + open_ttl_files_.insert(*blob_file); + total_blob_size_ += BlobLogHeader::kSize; + epoch_of_++; + + return s; +} + +class BlobDBImpl::BlobInserter : public WriteBatch::Handler { + private: + const WriteOptions& options_; + BlobDBImpl* blob_db_impl_; + uint32_t default_cf_id_; + WriteBatch batch_; + + public: + BlobInserter(const WriteOptions& options, BlobDBImpl* blob_db_impl, + uint32_t default_cf_id) + : options_(options), + blob_db_impl_(blob_db_impl), + default_cf_id_(default_cf_id) {} + + WriteBatch* batch() { return &batch_; } + + Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + Status s = blob_db_impl_->PutBlobValue(options_, key, value, kNoExpiration, + &batch_); + return s; + } + + Status DeleteCF(uint32_t column_family_id, const Slice& key) override { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + Status s = WriteBatchInternal::Delete(&batch_, column_family_id, key); + return s; + } + + virtual Status DeleteRange(uint32_t column_family_id, const Slice& begin_key, + const Slice& end_key) { + if (column_family_id != default_cf_id_) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + Status s = WriteBatchInternal::DeleteRange(&batch_, column_family_id, + begin_key, end_key); + return s; + } + + Status SingleDeleteCF(uint32_t /*column_family_id*/, + const Slice& /*key*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + Status MergeCF(uint32_t /*column_family_id*/, const Slice& /*key*/, + const Slice& /*value*/) override { + return Status::NotSupported("Not supported operation in blob db."); + } + + void LogData(const Slice& blob) override { batch_.PutLogData(blob); } +}; + +Status BlobDBImpl::Write(const WriteOptions& options, WriteBatch* updates) { + StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_WRITE); + uint32_t default_cf_id = + reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID(); + Status s; + BlobInserter blob_inserter(options, this, default_cf_id); + { + // Release write_mutex_ before DB write to avoid race condition with + // flush begin listener, which also require write_mutex_ to sync + // blob files. + MutexLock l(&write_mutex_); + s = updates->Iterate(&blob_inserter); + } + if (!s.ok()) { + return s; + } + return db_->Write(options, blob_inserter.batch()); +} + +Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, + const Slice& value) { + return PutUntil(options, key, value, kNoExpiration); +} + +Status BlobDBImpl::PutWithTTL(const WriteOptions& options, + const Slice& key, const Slice& value, + uint64_t ttl) { + uint64_t now = EpochNow(); + uint64_t expiration = kNoExpiration - now > ttl ? now + ttl : kNoExpiration; + return PutUntil(options, key, value, expiration); +} + +Status BlobDBImpl::PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) { + StopWatch write_sw(env_, statistics_, BLOB_DB_WRITE_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_PUT); + TEST_SYNC_POINT("BlobDBImpl::PutUntil:Start"); + Status s; + WriteBatch batch; + { + // Release write_mutex_ before DB write to avoid race condition with + // flush begin listener, which also require write_mutex_ to sync + // blob files. + MutexLock l(&write_mutex_); + s = PutBlobValue(options, key, value, expiration, &batch); + } + if (s.ok()) { + s = db_->Write(options, &batch); + } + TEST_SYNC_POINT("BlobDBImpl::PutUntil:Finish"); + return s; +} + +Status BlobDBImpl::PutBlobValue(const WriteOptions& /*options*/, + const Slice& key, const Slice& value, + uint64_t expiration, WriteBatch* batch) { + write_mutex_.AssertHeld(); + Status s; + std::string index_entry; + uint32_t column_family_id = + reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->GetID(); + if (value.size() < bdb_options_.min_blob_size) { + if (expiration == kNoExpiration) { + // Put as normal value + s = batch->Put(key, value); + RecordTick(statistics_, BLOB_DB_WRITE_INLINED); + } else { + // Inlined with TTL + BlobIndex::EncodeInlinedTTL(&index_entry, expiration, value); + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + RecordTick(statistics_, BLOB_DB_WRITE_INLINED_TTL); + } + } else { + std::string compression_output; + Slice value_compressed = GetCompressedSlice(value, &compression_output); + + std::string headerbuf; + Writer::ConstructBlobHeader(&headerbuf, key, value_compressed, expiration); + + // Check DB size limit before selecting blob file to + // Since CheckSizeAndEvictBlobFiles() can close blob files, it needs to be + // done before calling SelectBlobFile(). + s = CheckSizeAndEvictBlobFiles(headerbuf.size() + key.size() + + value_compressed.size()); + if (!s.ok()) { + return s; + } + + std::shared_ptr<BlobFile> blob_file; + if (expiration != kNoExpiration) { + s = SelectBlobFileTTL(expiration, &blob_file); + } else { + s = SelectBlobFile(&blob_file); + } + if (s.ok()) { + assert(blob_file != nullptr); + assert(blob_file->compression() == bdb_options_.compression); + s = AppendBlob(blob_file, headerbuf, key, value_compressed, expiration, + &index_entry); + } + if (s.ok()) { + if (expiration != kNoExpiration) { + blob_file->ExtendExpirationRange(expiration); + } + s = CloseBlobFileIfNeeded(blob_file); + } + if (s.ok()) { + s = WriteBatchInternal::PutBlobIndex(batch, column_family_id, key, + index_entry); + } + if (s.ok()) { + if (expiration == kNoExpiration) { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB); + } else { + RecordTick(statistics_, BLOB_DB_WRITE_BLOB_TTL); + } + } else { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to append blob to FILE: %s: KEY: %s VALSZ: %" ROCKSDB_PRIszt + " status: '%s' blob_file: '%s'", + blob_file->PathName().c_str(), key.ToString().c_str(), value.size(), + s.ToString().c_str(), blob_file->DumpState().c_str()); + } + } + + RecordTick(statistics_, BLOB_DB_NUM_KEYS_WRITTEN); + RecordTick(statistics_, BLOB_DB_BYTES_WRITTEN, key.size() + value.size()); + RecordInHistogram(statistics_, BLOB_DB_KEY_SIZE, key.size()); + RecordInHistogram(statistics_, BLOB_DB_VALUE_SIZE, value.size()); + + return s; +} + +Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, + std::string* compression_output) const { + if (bdb_options_.compression == kNoCompression) { + return raw; + } + StopWatch compression_sw(env_, statistics_, BLOB_DB_COMPRESSION_MICROS); + CompressionType type = bdb_options_.compression; + CompressionOptions opts; + CompressionContext context(type); + CompressionInfo info(opts, context, CompressionDict::GetEmptyDict(), type, + 0 /* sample_for_compression */); + CompressBlock(raw, info, &type, kBlockBasedTableVersionFormat, false, + compression_output, nullptr, nullptr); + return *compression_output; +} + +void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { + ReadLock l(&mutex_); + + context->next_file_number = next_file_number_.load(); + context->current_blob_files.clear(); + for (auto& p : blob_files_) { + context->current_blob_files.insert(p.first); + } + context->fifo_eviction_seq = fifo_eviction_seq_; + context->evict_expiration_up_to = evict_expiration_up_to_; +} + +void BlobDBImpl::UpdateLiveSSTSize() { + uint64_t live_sst_size = 0; + bool ok = GetIntProperty(DB::Properties::kLiveSstFilesSize, &live_sst_size); + if (ok) { + live_sst_size_.store(live_sst_size); + ROCKS_LOG_INFO(db_options_.info_log, + "Updated total SST file size: %" PRIu64 " bytes.", + live_sst_size); + } else { + ROCKS_LOG_ERROR( + db_options_.info_log, + "Failed to update total SST file size after flush or compaction."); + } + { + // Trigger FIFO eviction if needed. + MutexLock l(&write_mutex_); + Status s = CheckSizeAndEvictBlobFiles(0, true /*force*/); + if (s.IsNoSpace()) { + ROCKS_LOG_WARN(db_options_.info_log, + "DB grow out-of-space after SST size updated. Current live" + " SST size: %" PRIu64 + " , current blob files size: %" PRIu64 ".", + live_sst_size_.load(), total_blob_size_.load()); + } + } +} + +Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict) { + write_mutex_.AssertHeld(); + + uint64_t live_sst_size = live_sst_size_.load(); + if (bdb_options_.max_db_size == 0 || + live_sst_size + total_blob_size_.load() + blob_size <= + bdb_options_.max_db_size) { + return Status::OK(); + } + + if (bdb_options_.is_fifo == false || + (!force_evict && live_sst_size + blob_size > bdb_options_.max_db_size)) { + // FIFO eviction is disabled, or no space to insert new blob even we evict + // all blob files. + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); + } + + std::vector<std::shared_ptr<BlobFile>> candidate_files; + CopyBlobFiles(&candidate_files); + std::sort(candidate_files.begin(), candidate_files.end(), + BlobFileComparator()); + fifo_eviction_seq_ = GetLatestSequenceNumber(); + + WriteLock l(&mutex_); + + while (!candidate_files.empty() && + live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + std::shared_ptr<BlobFile> blob_file = candidate_files.back(); + candidate_files.pop_back(); + WriteLock file_lock(&blob_file->mutex_); + if (blob_file->Obsolete()) { + // File already obsoleted by someone else. + continue; + } + // FIFO eviction can evict open blob files. + if (!blob_file->Immutable()) { + Status s = CloseBlobFile(blob_file, false /*need_lock*/); + if (!s.ok()) { + return s; + } + } + assert(blob_file->Immutable()); + auto expiration_range = blob_file->GetExpirationRange(); + ROCKS_LOG_INFO(db_options_.info_log, + "Evict oldest blob file since DB out of space. Current " + "live SST file size: %" PRIu64 ", total blob size: %" PRIu64 + ", max db size: %" PRIu64 ", evicted blob file #%" PRIu64 + ".", + live_sst_size, total_blob_size_.load(), + bdb_options_.max_db_size, blob_file->BlobFileNumber()); + ObsoleteBlobFile(blob_file, fifo_eviction_seq_, true /*update_size*/); + evict_expiration_up_to_ = expiration_range.first; + RecordTick(statistics_, BLOB_DB_FIFO_NUM_FILES_EVICTED); + RecordTick(statistics_, BLOB_DB_FIFO_NUM_KEYS_EVICTED, + blob_file->BlobCount()); + RecordTick(statistics_, BLOB_DB_FIFO_BYTES_EVICTED, + blob_file->GetFileSize()); + TEST_SYNC_POINT("BlobDBImpl::EvictOldestBlobFile:Evicted"); + } + if (live_sst_size + total_blob_size_.load() + blob_size > + bdb_options_.max_db_size) { + return Status::NoSpace( + "Write failed, as writing it would exceed max_db_size limit."); + } + return Status::OK(); +} + +Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile, + const std::string& headerbuf, const Slice& key, + const Slice& value, uint64_t expiration, + std::string* index_entry) { + Status s; + uint64_t blob_offset = 0; + uint64_t key_offset = 0; + { + WriteLock lockbfile_w(&bfile->mutex_); + std::shared_ptr<Writer> writer; + s = CheckOrCreateWriterLocked(bfile, &writer); + if (!s.ok()) { + return s; + } + + // write the blob to the blob log. + s = writer->EmitPhysicalRecord(headerbuf, key, value, &key_offset, + &blob_offset); + } + + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Invalid status in AppendBlob: %s status: '%s'", + bfile->PathName().c_str(), s.ToString().c_str()); + return s; + } + + // increment blob count + bfile->blob_count_++; + + uint64_t size_put = headerbuf.size() + key.size() + value.size(); + bfile->file_size_ += size_put; + total_blob_size_ += size_put; + + if (expiration == kNoExpiration) { + BlobIndex::EncodeBlob(index_entry, bfile->BlobFileNumber(), blob_offset, + value.size(), bdb_options_.compression); + } else { + BlobIndex::EncodeBlobTTL(index_entry, expiration, bfile->BlobFileNumber(), + blob_offset, value.size(), + bdb_options_.compression); + } + + return s; +} + +std::vector<Status> BlobDBImpl::MultiGet( + const ReadOptions& read_options, + const std::vector<Slice>& keys, std::vector<std::string>* values) { + StopWatch multiget_sw(env_, statistics_, BLOB_DB_MULTIGET_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_MULTIGET); + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); + + std::vector<Status> statuses; + statuses.reserve(keys.size()); + values->clear(); + values->reserve(keys.size()); + PinnableSlice value; + for (size_t i = 0; i < keys.size(); i++) { + statuses.push_back(Get(ro, DefaultColumnFamily(), keys[i], &value)); + values->push_back(value.ToString()); + value.Reset(); + } + if (snapshot_created) { + db_->ReleaseSnapshot(ro.snapshot); + } + return statuses; +} + +bool BlobDBImpl::SetSnapshotIfNeeded(ReadOptions* read_options) { + assert(read_options != nullptr); + if (read_options->snapshot != nullptr) { + return false; + } + read_options->snapshot = db_->GetSnapshot(); + return true; +} + +Status BlobDBImpl::GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value, uint64_t* expiration) { + assert(value != nullptr); + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(index_entry); + if (!s.ok()) { + return s; + } + if (blob_index.HasTTL() && blob_index.expiration() <= EpochNow()) { + return Status::NotFound("Key expired"); + } + if (expiration != nullptr) { + if (blob_index.HasTTL()) { + *expiration = blob_index.expiration(); + } else { + *expiration = kNoExpiration; + } + } + if (blob_index.IsInlined()) { + // TODO(yiwu): If index_entry is a PinnableSlice, we can also pin the same + // memory buffer to avoid extra copy. + value->PinSelf(blob_index.value()); + return Status::OK(); + } + if (blob_index.size() == 0) { + value->PinSelf(""); + return Status::OK(); + } + + // offset has to have certain min, as we will read CRC + // later from the Blob Header, which needs to be also a + // valid offset. + if (blob_index.offset() < + (BlobLogHeader::kSize + BlobLogRecord::kHeaderSize + key.size())) { + if (debug_level_ >= 2) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Invalid blob index file_number: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s", + blob_index.file_number(), blob_index.offset(), + blob_index.size(), key.data()); + } + return Status::NotFound("Invalid blob offset"); + } + + std::shared_ptr<BlobFile> bfile; + { + ReadLock rl(&mutex_); + auto hitr = blob_files_.find(blob_index.file_number()); + + // file was deleted + if (hitr == blob_files_.end()) { + return Status::NotFound("Blob Not Found as blob file missing"); + } + + bfile = hitr->second; + } + + if (blob_index.size() == 0 && value != nullptr) { + value->PinSelf(""); + return Status::OK(); + } + + // takes locks when called + std::shared_ptr<RandomAccessFileReader> reader; + s = GetBlobFileReader(bfile, &reader); + if (!s.ok()) { + return s; + } + + assert(blob_index.offset() > key.size() + sizeof(uint32_t)); + uint64_t record_offset = blob_index.offset() - key.size() - sizeof(uint32_t); + uint64_t record_size = sizeof(uint32_t) + key.size() + blob_index.size(); + + // Allocate the buffer. This is safe in C++11 + std::string buffer_str(static_cast<size_t>(record_size), static_cast<char>(0)); + char* buffer = &buffer_str[0]; + + // A partial blob record contain checksum, key and value. + Slice blob_record; + { + StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); + s = reader->Read(record_offset, static_cast<size_t>(record_size), &blob_record, buffer); + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, blob_record.size()); + } + if (!s.ok()) { + ROCKS_LOG_DEBUG(db_options_.info_log, + "Failed to read blob from blob file %" PRIu64 + ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64 + ", key_size: %" ROCKSDB_PRIszt ", status: '%s'", + bfile->BlobFileNumber(), blob_index.offset(), + blob_index.size(), key.size(), s.ToString().c_str()); + return s; + } + if (blob_record.size() != record_size) { + ROCKS_LOG_DEBUG( + db_options_.info_log, + "Failed to read blob from blob file %" PRIu64 ", blob_offset: %" PRIu64 + ", blob_size: %" PRIu64 ", key_size: %" ROCKSDB_PRIszt + ", read %" ROCKSDB_PRIszt " bytes, expected %" PRIu64 " bytes", + bfile->BlobFileNumber(), blob_index.offset(), blob_index.size(), + key.size(), blob_record.size(), record_size); + + return Status::Corruption("Failed to retrieve blob from blob index."); + } + Slice crc_slice(blob_record.data(), sizeof(uint32_t)); + Slice blob_value(blob_record.data() + sizeof(uint32_t) + key.size(), + static_cast<size_t>(blob_index.size())); + uint32_t crc_exp; + if (!GetFixed32(&crc_slice, &crc_exp)) { + ROCKS_LOG_DEBUG(db_options_.info_log, + "Unable to decode CRC from blob file %" PRIu64 + ", blob_offset: %" PRIu64 ", blob_size: %" PRIu64 + ", key size: %" ROCKSDB_PRIszt ", status: '%s'", + bfile->BlobFileNumber(), blob_index.offset(), + blob_index.size(), key.size(), s.ToString().c_str()); + return Status::Corruption("Unable to decode checksum."); + } + + uint32_t crc = crc32c::Value(blob_record.data() + sizeof(uint32_t), + blob_record.size() - sizeof(uint32_t)); + crc = crc32c::Mask(crc); // Adjust for storage + if (crc != crc_exp) { + if (debug_level_ >= 2) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Blob crc mismatch file: %s blob_offset: %" PRIu64 + " blob_size: %" PRIu64 " key: %s status: '%s'", + bfile->PathName().c_str(), blob_index.offset(), + blob_index.size(), key.data(), s.ToString().c_str()); + } + return Status::Corruption("Corruption. Blob CRC mismatch"); + } + + if (bfile->compression() == kNoCompression) { + value->PinSelf(blob_value); + } else { + BlockContents contents; + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); + { + StopWatch decompression_sw(env_, statistics_, + BLOB_DB_DECOMPRESSION_MICROS); + UncompressionContext context(bfile->compression()); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + bfile->compression()); + s = UncompressBlockContentsForCompressionType( + info, blob_value.data(), blob_value.size(), &contents, + kBlockBasedTableVersionFormat, *(cfh->cfd()->ioptions())); + } + value->PinSelf(contents.data); + } + + return s; +} + +Status BlobDBImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return Get(read_options, column_family, key, value, nullptr /*expiration*/); +} + +Status BlobDBImpl::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration) { + StopWatch get_sw(env_, statistics_, BLOB_DB_GET_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_GET); + return GetImpl(read_options, column_family, key, value, expiration); +} + +Status BlobDBImpl::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration) { + if (column_family != DefaultColumnFamily()) { + return Status::NotSupported( + "Blob DB doesn't support non-default column family."); + } + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + // TODO(yiwu): For Get() retry if file not found would be a simpler strategy. + ReadOptions ro(read_options); + bool snapshot_created = SetSnapshotIfNeeded(&ro); + + PinnableSlice index_entry; + Status s; + bool is_blob_index = false; + s = db_impl_->GetImpl(ro, column_family, key, &index_entry, + nullptr /*value_found*/, nullptr /*read_callback*/, + &is_blob_index); + TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:1"); + TEST_SYNC_POINT("BlobDBImpl::Get:AfterIndexEntryGet:2"); + if (expiration != nullptr) { + *expiration = kNoExpiration; + } + RecordTick(statistics_, BLOB_DB_NUM_KEYS_READ); + if (s.ok()) { + if (is_blob_index) { + s = GetBlobValue(key, index_entry, value, expiration); + } else { + // The index entry is the value itself in this case. + value->PinSelf(index_entry); + } + RecordTick(statistics_, BLOB_DB_BYTES_READ, value->size()); + } + if (snapshot_created) { + db_->ReleaseSnapshot(ro.snapshot); + } + return s; +} + +std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + ROCKS_LOG_INFO(db_options_.info_log, "Starting Sanity Check"); + ROCKS_LOG_INFO(db_options_.info_log, "Number of files %" ROCKSDB_PRIszt, + blob_files_.size()); + ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt, + open_ttl_files_.size()); + + for (auto bfile : open_ttl_files_) { + assert(!bfile->Immutable()); + } + + uint64_t now = EpochNow(); + + for (auto blob_file_pair : blob_files_) { + auto blob_file = blob_file_pair.second; + char buf[1000]; + int pos = snprintf(buf, sizeof(buf), + "Blob file %" PRIu64 ", size %" PRIu64 + ", blob count %" PRIu64 ", immutable %d", + blob_file->BlobFileNumber(), blob_file->GetFileSize(), + blob_file->BlobCount(), blob_file->Immutable()); + if (blob_file->HasTTL()) { + auto expiration_range = blob_file->GetExpirationRange(); + pos += snprintf(buf + pos, sizeof(buf) - pos, + ", expiration range (%" PRIu64 ", %" PRIu64 ")", + expiration_range.first, expiration_range.second); + if (!blob_file->Obsolete()) { + pos += snprintf(buf + pos, sizeof(buf) - pos, + ", expire in %" PRIu64 " seconds", + expiration_range.second - now); + } + } + if (blob_file->Obsolete()) { + pos += snprintf(buf + pos, sizeof(buf) - pos, ", obsolete at %" PRIu64, + blob_file->GetObsoleteSequence()); + } + snprintf(buf + pos, sizeof(buf) - pos, "."); + ROCKS_LOG_INFO(db_options_.info_log, "%s", buf); + } + + // reschedule + return std::make_pair(true, -1); +} + +Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile, + bool need_lock) { + assert(bfile != nullptr); + write_mutex_.AssertHeld(); + Status s; + ROCKS_LOG_INFO(db_options_.info_log, + "Closing blob file %" PRIu64 ". Path: %s", + bfile->BlobFileNumber(), bfile->PathName().c_str()); + { + std::unique_ptr<WriteLock> lock; + if (need_lock) { + lock.reset(new WriteLock(&mutex_)); + } + + if (bfile->HasTTL()) { + size_t erased __attribute__((__unused__)); + erased = open_ttl_files_.erase(bfile); + } else if (bfile == open_non_ttl_file_) { + open_non_ttl_file_ = nullptr; + } + } + + if (!bfile->closed_.load()) { + std::unique_ptr<WriteLock> file_lock; + if (need_lock) { + file_lock.reset(new WriteLock(&bfile->mutex_)); + } + s = bfile->WriteFooterAndCloseLocked(); + } + + if (s.ok()) { + total_blob_size_ += BlobLogFooter::kSize; + } else { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to close blob file %" PRIu64 "with error: %s", + bfile->BlobFileNumber(), s.ToString().c_str()); + } + + return s; +} + +Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) { + // atomic read + if (bfile->GetFileSize() < bdb_options_.blob_file_size) { + return Status::OK(); + } + return CloseBlobFile(bfile); +} + +void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + // Should hold write lock of mutex_ or during DB open. + blob_file->MarkObsolete(obsolete_seq); + obsolete_files_.push_back(blob_file); + assert(total_blob_size_.load() >= blob_file->GetFileSize()); + if (update_size) { + total_blob_size_ -= blob_file->GetFileSize(); + } +} + +bool BlobDBImpl::VisibleToActiveSnapshot( + const std::shared_ptr<BlobFile>& bfile) { + assert(bfile->Obsolete()); + + // We check whether the oldest snapshot is no less than the last sequence + // by the time the blob file become obsolete. If so, the blob file is not + // visible to all existing snapshots. + // + // If we keep track of the earliest sequence of the keys in the blob file, + // we could instead check if there's a snapshot falls in range + // [earliest_sequence, obsolete_sequence). But doing so will make the + // implementation more complicated. + SequenceNumber obsolete_sequence = bfile->GetObsoleteSequence(); + SequenceNumber oldest_snapshot = kMaxSequenceNumber; + { + // Need to lock DBImpl mutex before access snapshot list. + InstrumentedMutexLock l(db_impl_->mutex()); + auto& snapshots = db_impl_->snapshots(); + if (!snapshots.empty()) { + oldest_snapshot = snapshots.oldest()->GetSequenceNumber(); + } + } + bool visible = oldest_snapshot < obsolete_sequence; + if (visible) { + ROCKS_LOG_INFO(db_options_.info_log, + "Obsolete blob file %" PRIu64 " (obsolete at %" PRIu64 + ") visible to oldest snapshot %" PRIu64 ".", + bfile->BlobFileNumber(), obsolete_sequence, oldest_snapshot); + } + return visible; +} + +std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:0"); + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:1"); + + std::vector<std::shared_ptr<BlobFile>> process_files; + uint64_t now = EpochNow(); + { + ReadLock rl(&mutex_); + for (auto p : blob_files_) { + auto& blob_file = p.second; + ReadLock file_lock(&blob_file->mutex_); + if (blob_file->HasTTL() && !blob_file->Obsolete() && + blob_file->GetExpirationRange().second <= now) { + process_files.push_back(blob_file); + } + } + } + + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:2"); + TEST_SYNC_POINT("BlobDBImpl::EvictExpiredFiles:3"); + TEST_SYNC_POINT_CALLBACK("BlobDBImpl::EvictExpiredFiles:cb", nullptr); + + SequenceNumber seq = GetLatestSequenceNumber(); + { + MutexLock l(&write_mutex_); + for (auto& blob_file : process_files) { + WriteLock file_lock(&blob_file->mutex_); + if (!blob_file->Immutable()) { + CloseBlobFile(blob_file, false /*need_lock*/); + } + // Need to double check if the file is obsolete. + if (!blob_file->Obsolete()) { + ObsoleteBlobFile(blob_file, seq, true /*update_size*/); + } + } + } + + return std::make_pair(true, -1); +} + +Status BlobDBImpl::SyncBlobFiles() { + MutexLock l(&write_mutex_); + + std::vector<std::shared_ptr<BlobFile>> process_files; + { + ReadLock rl(&mutex_); + for (auto fitr : open_ttl_files_) { + process_files.push_back(fitr); + } + if (open_non_ttl_file_ != nullptr) { + process_files.push_back(open_non_ttl_file_); + } + } + + Status s; + for (auto& blob_file : process_files) { + s = blob_file->Fsync(); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to sync blob file %" PRIu64 ", status: %s", + blob_file->BlobFileNumber(), s.ToString().c_str()); + return s; + } + } + + s = dir_ent_->Fsync(); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to sync blob directory, status: %s", + s.ToString().c_str()); + } + return s; +} + +std::pair<bool, int64_t> BlobDBImpl::ReclaimOpenFiles(bool aborted) { + if (aborted) return std::make_pair(false, -1); + + if (open_file_count_.load() < kOpenFilesTrigger) { + return std::make_pair(true, -1); + } + + // in the future, we should sort by last_access_ + // instead of closing every file + ReadLock rl(&mutex_); + for (auto const& ent : blob_files_) { + auto bfile = ent.second; + if (bfile->last_access_.load() == -1) continue; + + WriteLock lockbfile_w(&bfile->mutex_); + CloseRandomAccessLocked(bfile); + } + + return std::make_pair(true, -1); +} + +// Write callback for garbage collection to check if key has been updated +// since last read. Similar to how OptimisticTransaction works. See inline +// comment in GCFileAndUpdateLSM(). +class BlobDBImpl::GarbageCollectionWriteCallback : public WriteCallback { + public: + GarbageCollectionWriteCallback(ColumnFamilyData* cfd, const Slice& key, + SequenceNumber upper_bound) + : cfd_(cfd), key_(key), upper_bound_(upper_bound) {} + + Status Callback(DB* db) override { + auto* db_impl = reinterpret_cast<DBImpl*>(db); + auto* sv = db_impl->GetAndRefSuperVersion(cfd_); + SequenceNumber latest_seq = 0; + bool found_record_for_key = false; + bool is_blob_index = false; + Status s = db_impl->GetLatestSequenceForKey( + sv, key_, false /*cache_only*/, &latest_seq, &found_record_for_key, + &is_blob_index); + db_impl->ReturnAndCleanupSuperVersion(cfd_, sv); + if (!s.ok() && !s.IsNotFound()) { + // Error. + assert(!s.IsBusy()); + return s; + } + if (s.IsNotFound()) { + assert(!found_record_for_key); + return Status::Busy("Key deleted"); + } + assert(found_record_for_key); + assert(is_blob_index); + if (latest_seq > upper_bound_) { + return Status::Busy("Key overwritten"); + } + return s; + } + + bool AllowWriteBatching() override { return false; } + + private: + ColumnFamilyData* cfd_; + // Key to check + Slice key_; + // Upper bound of sequence number to proceed. + SequenceNumber upper_bound_; +}; + +// iterate over the blobs sequentially and check if the blob sequence number +// is the latest. If it is the latest, preserve it, otherwise delete it +// if it is TTL based, and the TTL has expired, then +// we can blow the entity if the key is still the latest or the Key is not +// found +// WHAT HAPPENS IF THE KEY HAS BEEN OVERRIDEN. Then we can drop the blob +// without doing anything if the earliest snapshot is not +// referring to that sequence number, i.e. it is later than the sequence number +// of the new key +// +// if it is not TTL based, then we can blow the key if the key has been +// DELETED in the LSM +Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr, + GCStats* gc_stats) { + StopWatch gc_sw(env_, statistics_, BLOB_DB_GC_MICROS); + uint64_t now = EpochNow(); + + std::shared_ptr<Reader> reader = + bfptr->OpenRandomAccessReader(env_, db_options_, env_options_); + if (!reader) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File sequential reader could not be opened for %s", + bfptr->PathName().c_str()); + return Status::IOError("failed to create sequential reader"); + } + + BlobLogHeader header; + Status s = reader->ReadHeader(&header); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failure to read header for blob-file %s", + bfptr->PathName().c_str()); + return s; + } + + auto cfh = db_impl_->DefaultColumnFamily(); + auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); + auto column_family_id = cfd->GetID(); + bool has_ttl = header.has_ttl; + + // this reads the key but skips the blob + Reader::ReadLevel shallow = Reader::kReadHeaderKey; + + bool file_expired = has_ttl && now >= bfptr->GetExpirationRange().second; + + if (!file_expired) { + // read the blob because you have to write it back to new file + shallow = Reader::kReadHeaderKeyBlob; + } + + BlobLogRecord record; + std::shared_ptr<BlobFile> newfile; + std::shared_ptr<Writer> new_writer; + uint64_t blob_offset = 0; + + while (true) { + assert(s.ok()); + + // Read the next blob record. + Status read_record_status = + reader->ReadRecord(&record, shallow, &blob_offset); + // Exit if we reach the end of blob file. + // TODO(yiwu): properly handle ReadRecord error. + if (!read_record_status.ok()) { + break; + } + gc_stats->blob_count++; + + // Similar to OptimisticTransaction, we obtain latest_seq from + // base DB, which is guaranteed to be no smaller than the sequence of + // current key. We use a WriteCallback on write to check the key sequence + // on write. If the key sequence is larger than latest_seq, we know + // a new versions is inserted and the old blob can be disgard. + // + // We cannot use OptimisticTransaction because we need to pass + // is_blob_index flag to GetImpl. + SequenceNumber latest_seq = GetLatestSequenceNumber(); + bool is_blob_index = false; + PinnableSlice index_entry; + Status get_status = db_impl_->GetImpl( + ReadOptions(), cfh, record.key, &index_entry, nullptr /*value_found*/, + nullptr /*read_callback*/, &is_blob_index); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB"); + if (!get_status.ok() && !get_status.IsNotFound()) { + // error + s = get_status; + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while getting index entry: %s", + s.ToString().c_str()); + break; + } + if (get_status.IsNotFound() || !is_blob_index) { + // Either the key is deleted or updated with a newer version whish is + // inlined in LSM. + gc_stats->num_keys_overwritten++; + gc_stats->bytes_overwritten += record.record_size(); + continue; + } + + BlobIndex blob_index; + s = blob_index.DecodeFrom(index_entry); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while decoding index entry: %s", + s.ToString().c_str()); + break; + } + if (blob_index.IsInlined() || + blob_index.file_number() != bfptr->BlobFileNumber() || + blob_index.offset() != blob_offset) { + // Key has been overwritten. Drop the blob record. + gc_stats->num_keys_overwritten++; + gc_stats->bytes_overwritten += record.record_size(); + continue; + } + + GarbageCollectionWriteCallback callback(cfd, record.key, latest_seq); + + // If key has expired, remove it from base DB. + // TODO(yiwu): Blob indexes will be remove by BlobIndexCompactionFilter. + // We can just drop the blob record. + if (file_expired || (has_ttl && now >= record.expiration)) { + gc_stats->num_keys_expired++; + gc_stats->bytes_expired += record.record_size(); + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"); + WriteBatch delete_batch; + Status delete_status = delete_batch.Delete(record.key); + if (delete_status.ok()) { + delete_status = db_impl_->WriteWithCallback(WriteOptions(), + &delete_batch, &callback); + } + if (!delete_status.ok() && !delete_status.IsBusy()) { + // We hit an error. + s = delete_status; + ROCKS_LOG_ERROR(db_options_.info_log, + "Error while deleting expired key: %s", + s.ToString().c_str()); + break; + } + // Continue to next blob record or retry. + continue; + } + + // Relocate the blob record to new file. + if (!newfile) { + // new file + std::string reason("GC of "); + reason += bfptr->PathName(); + newfile = NewBlobFile(reason); + + s = CheckOrCreateWriterLocked(newfile, &new_writer); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "Failed to open file %s for writer, error: %s", + newfile->PathName().c_str(), s.ToString().c_str()); + break; + } + // Can't use header beyond this point + newfile->header_ = std::move(header); + newfile->header_valid_ = true; + newfile->file_size_ = BlobLogHeader::kSize; + newfile->SetColumnFamilyId(bfptr->column_family_id()); + newfile->SetHasTTL(bfptr->HasTTL()); + newfile->SetCompression(bfptr->compression()); + newfile->expiration_range_ = bfptr->expiration_range_; + + s = new_writer->WriteHeader(newfile->header_); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File: %s - header writing failed", + newfile->PathName().c_str()); + break; + } + + // We don't add the file to open_ttl_files_ or open_non_ttl_files_, to + // avoid user writes writing to the file, and avoid + // EvictExpiredFiles close the file by mistake. + WriteLock wl(&mutex_); + blob_files_.insert(std::make_pair(newfile->BlobFileNumber(), newfile)); + } + + std::string new_index_entry; + uint64_t new_blob_offset = 0; + uint64_t new_key_offset = 0; + // write the blob to the blob log. + s = new_writer->AddRecord(record.key, record.value, record.expiration, + &new_key_offset, &new_blob_offset); + + BlobIndex::EncodeBlob(&new_index_entry, newfile->BlobFileNumber(), + new_blob_offset, record.value.size(), + bdb_options_.compression); + + newfile->blob_count_++; + newfile->file_size_ += + BlobLogRecord::kHeaderSize + record.key.size() + record.value.size(); + + TEST_SYNC_POINT("BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"); + WriteBatch rewrite_batch; + Status rewrite_status = WriteBatchInternal::PutBlobIndex( + &rewrite_batch, column_family_id, record.key, new_index_entry); + if (rewrite_status.ok()) { + rewrite_status = db_impl_->WriteWithCallback(WriteOptions(), + &rewrite_batch, &callback); + } + if (rewrite_status.ok()) { + gc_stats->num_keys_relocated++; + gc_stats->bytes_relocated += record.record_size(); + } else if (rewrite_status.IsBusy()) { + // The key is overwritten in the meanwhile. Drop the blob record. + gc_stats->num_keys_overwritten++; + gc_stats->bytes_overwritten += record.record_size(); + } else { + // We hit an error. + s = rewrite_status; + ROCKS_LOG_ERROR(db_options_.info_log, "Error while relocating key: %s", + s.ToString().c_str()); + break; + } + } // end of ReadRecord loop + + { + WriteLock wl(&mutex_); + ObsoleteBlobFile(bfptr, GetLatestSequenceNumber(), true /*update_size*/); + } + + ROCKS_LOG_INFO( + db_options_.info_log, + "%s blob file %" PRIu64 ". Total blob records: %" PRIu64 + ", expired: %" PRIu64 " keys/%" PRIu64 + " bytes, updated or deleted by user: %" PRIu64 " keys/%" PRIu64 + " bytes, rewrite to new file: %" PRIu64 " keys/%" PRIu64 " bytes.", + s.ok() ? "Successfully garbage collected" : "Failed to garbage collect", + bfptr->BlobFileNumber(), gc_stats->blob_count, gc_stats->num_keys_expired, + gc_stats->bytes_expired, gc_stats->num_keys_overwritten, + gc_stats->bytes_overwritten, gc_stats->num_keys_relocated, + gc_stats->bytes_relocated); + RecordTick(statistics_, BLOB_DB_GC_NUM_FILES); + RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_OVERWRITTEN, + gc_stats->num_keys_overwritten); + RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_EXPIRED, + gc_stats->num_keys_expired); + RecordTick(statistics_, BLOB_DB_GC_BYTES_OVERWRITTEN, + gc_stats->bytes_overwritten); + RecordTick(statistics_, BLOB_DB_GC_BYTES_EXPIRED, gc_stats->bytes_expired); + if (newfile != nullptr) { + { + MutexLock l(&write_mutex_); + CloseBlobFile(newfile); + } + total_blob_size_ += newfile->file_size_; + ROCKS_LOG_INFO(db_options_.info_log, "New blob file %" PRIu64 ".", + newfile->BlobFileNumber()); + RecordTick(statistics_, BLOB_DB_GC_NUM_NEW_FILES); + RecordTick(statistics_, BLOB_DB_GC_NUM_KEYS_RELOCATED, + gc_stats->num_keys_relocated); + RecordTick(statistics_, BLOB_DB_GC_BYTES_RELOCATED, + gc_stats->bytes_relocated); + } + if (!s.ok()) { + RecordTick(statistics_, BLOB_DB_GC_FAILURES); + } + return s; +} + +std::pair<bool, int64_t> BlobDBImpl::DeleteObsoleteFiles(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + MutexLock delete_file_lock(&delete_file_mutex_); + if (disable_file_deletions_ > 0) { + return std::make_pair(true, -1); + } + + std::list<std::shared_ptr<BlobFile>> tobsolete; + { + WriteLock wl(&mutex_); + if (obsolete_files_.empty()) { + return std::make_pair(true, -1); + } + tobsolete.swap(obsolete_files_); + } + + bool file_deleted = false; + for (auto iter = tobsolete.begin(); iter != tobsolete.end();) { + auto bfile = *iter; + { + ReadLock lockbfile_r(&bfile->mutex_); + if (VisibleToActiveSnapshot(bfile)) { + ROCKS_LOG_INFO(db_options_.info_log, + "Could not delete file due to snapshot failure %s", + bfile->PathName().c_str()); + ++iter; + continue; + } + } + ROCKS_LOG_INFO(db_options_.info_log, + "Will delete file due to snapshot success %s", + bfile->PathName().c_str()); + + blob_files_.erase(bfile->BlobFileNumber()); + Status s = DeleteDBFile(&(db_impl_->immutable_db_options()), + bfile->PathName(), blob_dir_, true); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, + "File failed to be deleted as obsolete %s", + bfile->PathName().c_str()); + ++iter; + continue; + } + + file_deleted = true; + ROCKS_LOG_INFO(db_options_.info_log, + "File deleted as obsolete from blob dir %s", + bfile->PathName().c_str()); + + iter = tobsolete.erase(iter); + } + + // directory change. Fsync + if (file_deleted) { + Status s = dir_ent_->Fsync(); + if (!s.ok()) { + ROCKS_LOG_ERROR(db_options_.info_log, "Failed to sync dir %s: %s", + blob_dir_.c_str(), s.ToString().c_str()); + } + } + + // put files back into obsolete if for some reason, delete failed + if (!tobsolete.empty()) { + WriteLock wl(&mutex_); + for (auto bfile : tobsolete) { + obsolete_files_.push_front(bfile); + } + } + + return std::make_pair(!aborted, -1); +} + +void BlobDBImpl::CopyBlobFiles( + std::vector<std::shared_ptr<BlobFile>>* bfiles_copy) { + ReadLock rl(&mutex_); + for (auto const& p : blob_files_) { + bfiles_copy->push_back(p.second); + } +} + +std::pair<bool, int64_t> BlobDBImpl::RunGC(bool aborted) { + if (aborted) { + return std::make_pair(false, -1); + } + + // TODO(yiwu): Garbage collection implementation. + + // reschedule + return std::make_pair(true, -1); +} + +Iterator* BlobDBImpl::NewIterator(const ReadOptions& read_options) { + auto* cfd = + reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily())->cfd(); + // Get a snapshot to avoid blob file get deleted between we + // fetch and index entry and reading from the file. + ManagedSnapshot* own_snapshot = nullptr; + const Snapshot* snapshot = read_options.snapshot; + if (snapshot == nullptr) { + own_snapshot = new ManagedSnapshot(db_); + snapshot = own_snapshot->snapshot(); + } + auto* iter = db_impl_->NewIteratorImpl( + read_options, cfd, snapshot->GetSequenceNumber(), + nullptr /*read_callback*/, true /*allow_blob*/); + return new BlobDBIterator(own_snapshot, iter, this, env_, statistics_); +} + +Status DestroyBlobDB(const std::string& dbname, const Options& options, + const BlobDBOptions& bdb_options) { + const ImmutableDBOptions soptions(SanitizeOptions(dbname, options)); + Env* env = soptions.env; + + Status status; + std::string blobdir; + blobdir = (bdb_options.path_relative) ? dbname + "/" + bdb_options.blob_dir + : bdb_options.blob_dir; + + std::vector<std::string> filenames; + env->GetChildren(blobdir, &filenames); + + for (const auto& f : filenames) { + uint64_t number; + FileType type; + if (ParseFileName(f, &number, &type) && type == kBlobFile) { + Status del = DeleteDBFile(&soptions, blobdir + "/" + f, blobdir, true); + if (status.ok() && !del.ok()) { + status = del; + } + } + } + env->DeleteDir(blobdir); + + Status destroy = DestroyDB(dbname, options); + if (status.ok() && !destroy.ok()) { + status = destroy; + } + + return status; +} + +#ifndef NDEBUG +Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value) { + return GetBlobValue(key, index_entry, value); +} + +std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const { + ReadLock l(&mutex_); + std::vector<std::shared_ptr<BlobFile>> blob_files; + for (auto& p : blob_files_) { + blob_files.emplace_back(p.second); + } + return blob_files; +} + +std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles() + const { + ReadLock l(&mutex_); + std::vector<std::shared_ptr<BlobFile>> obsolete_files; + for (auto& bfile : obsolete_files_) { + obsolete_files.emplace_back(bfile); + } + return obsolete_files; +} + +void BlobDBImpl::TEST_DeleteObsoleteFiles() { + DeleteObsoleteFiles(false /*abort*/); +} + +Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) { + MutexLock l(&write_mutex_); + return CloseBlobFile(bfile); +} + +void BlobDBImpl::TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file, + SequenceNumber obsolete_seq, + bool update_size) { + return ObsoleteBlobFile(blob_file, obsolete_seq, update_size); +} + +Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile, + GCStats* gc_stats) { + return GCFileAndUpdateLSM(bfile, gc_stats); +} + +void BlobDBImpl::TEST_RunGC() { RunGC(false /*abort*/); } + +void BlobDBImpl::TEST_EvictExpiredFiles() { + EvictExpiredFiles(false /*abort*/); +} + +uint64_t BlobDBImpl::TEST_live_sst_size() { return live_sst_size_.load(); } +#endif // !NDEBUG + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl.h b/src/rocksdb/utilities/blob_db/blob_db_impl.h new file mode 100644 index 00000000..0a22c0ac --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl.h @@ -0,0 +1,425 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include <atomic> +#include <condition_variable> +#include <limits> +#include <list> +#include <memory> +#include <set> +#include <string> +#include <thread> +#include <unordered_map> +#include <utility> +#include <vector> + +#include "db/db_iter.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/db.h" +#include "rocksdb/listener.h" +#include "rocksdb/options.h" +#include "rocksdb/statistics.h" +#include "rocksdb/wal_filter.h" +#include "util/mutexlock.h" +#include "util/timer_queue.h" +#include "utilities/blob_db/blob_db.h" +#include "utilities/blob_db/blob_file.h" +#include "utilities/blob_db/blob_log_format.h" +#include "utilities/blob_db/blob_log_reader.h" +#include "utilities/blob_db/blob_log_writer.h" + +namespace rocksdb { + +class DBImpl; +class ColumnFamilyHandle; +class ColumnFamilyData; +struct FlushJobInfo; + +namespace blob_db { + +struct BlobCompactionContext; +class BlobDBImpl; +class BlobFile; + +// Comparator to sort "TTL" aware Blob files based on the lower value of +// TTL range. +struct BlobFileComparatorTTL { + bool operator()(const std::shared_ptr<BlobFile>& lhs, + const std::shared_ptr<BlobFile>& rhs) const; +}; + +struct BlobFileComparator { + bool operator()(const std::shared_ptr<BlobFile>& lhs, + const std::shared_ptr<BlobFile>& rhs) const; +}; + +struct GCStats { + uint64_t blob_count = 0; + uint64_t num_keys_overwritten = 0; + uint64_t num_keys_expired = 0; + uint64_t num_keys_relocated = 0; + uint64_t bytes_overwritten = 0; + uint64_t bytes_expired = 0; + uint64_t bytes_relocated = 0; +}; + +/** + * The implementation class for BlobDB. This manages the value + * part in TTL aware sequentially written files. These files are + * Garbage Collected. + */ +class BlobDBImpl : public BlobDB { + friend class BlobFile; + friend class BlobDBIterator; + + public: + // deletions check period + static constexpr uint32_t kDeleteCheckPeriodMillisecs = 2 * 1000; + + // gc percentage each check period + static constexpr uint32_t kGCFilePercentage = 100; + + // gc period + static constexpr uint32_t kGCCheckPeriodMillisecs = 60 * 1000; + + // sanity check task + static constexpr uint32_t kSanityCheckPeriodMillisecs = 20 * 60 * 1000; + + // how many random access open files can we tolerate + static constexpr uint32_t kOpenFilesTrigger = 100; + + // how often to schedule reclaim open files. + static constexpr uint32_t kReclaimOpenFilesPeriodMillisecs = 1 * 1000; + + // how often to schedule delete obs files periods + static constexpr uint32_t kDeleteObsoleteFilesPeriodMillisecs = 10 * 1000; + + // how often to schedule expired files eviction. + static constexpr uint32_t kEvictExpiredFilesPeriodMillisecs = 10 * 1000; + + // when should oldest file be evicted: + // on reaching 90% of blob_dir_size + static constexpr double kEvictOldestFileAtSize = 0.9; + + using BlobDB::Put; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) override; + + using BlobDB::Get; + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value) override; + + Status Get(const ReadOptions& read_options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value, + uint64_t* expiration) override; + + using BlobDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& read_options) override; + + using BlobDB::NewIterators; + virtual Status NewIterators( + const ReadOptions& /*read_options*/, + const std::vector<ColumnFamilyHandle*>& /*column_families*/, + std::vector<Iterator*>* /*iterators*/) override { + return Status::NotSupported("Not implemented"); + } + + using BlobDB::MultiGet; + virtual std::vector<Status> MultiGet( + const ReadOptions& read_options, + const std::vector<Slice>& keys, + std::vector<std::string>* values) override; + + virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; + + virtual Status Close() override; + + using BlobDB::PutWithTTL; + Status PutWithTTL(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t ttl) override; + + using BlobDB::PutUntil; + Status PutUntil(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration) override; + + BlobDBOptions GetBlobDBOptions() const override; + + BlobDBImpl(const std::string& dbname, const BlobDBOptions& bdb_options, + const DBOptions& db_options, + const ColumnFamilyOptions& cf_options); + + virtual Status DisableFileDeletions() override; + + virtual Status EnableFileDeletions(bool force) override; + + virtual Status GetLiveFiles(std::vector<std::string>&, + uint64_t* manifest_file_size, + bool flush_memtable = true) override; + virtual void GetLiveFilesMetaData(std::vector<LiveFileMetaData>*) override; + + ~BlobDBImpl(); + + Status Open(std::vector<ColumnFamilyHandle*>* handles); + + Status SyncBlobFiles() override; + + void UpdateLiveSSTSize(); + + void GetCompactionContext(BlobCompactionContext* context); + +#ifndef NDEBUG + Status TEST_GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value); + + std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const; + + std::vector<std::shared_ptr<BlobFile>> TEST_GetObsoleteFiles() const; + + Status TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile); + + void TEST_ObsoleteBlobFile(std::shared_ptr<BlobFile>& blob_file, + SequenceNumber obsolete_seq = 0, + bool update_size = true); + + Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile, + GCStats* gc_stats); + + void TEST_RunGC(); + + void TEST_EvictExpiredFiles(); + + void TEST_DeleteObsoleteFiles(); + + uint64_t TEST_live_sst_size(); + + const std::string& TEST_blob_dir() const { return blob_dir_; } +#endif // !NDEBUG + + private: + class GarbageCollectionWriteCallback; + class BlobInserter; + + // Create a snapshot if there isn't one in read options. + // Return true if a snapshot is created. + bool SetSnapshotIfNeeded(ReadOptions* read_options); + + Status GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, uint64_t* expiration = nullptr); + + Status GetBlobValue(const Slice& key, const Slice& index_entry, + PinnableSlice* value, uint64_t* expiration = nullptr); + + Slice GetCompressedSlice(const Slice& raw, + std::string* compression_output) const; + + // Close a file by appending a footer, and removes file from open files list. + Status CloseBlobFile(std::shared_ptr<BlobFile> bfile, bool need_lock = true); + + // Close a file if its size exceeds blob_file_size + Status CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile); + + // Mark file as obsolete and move the file to obsolete file list. + // + // REQUIRED: hold write lock of mutex_ or during DB open. + void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file, + SequenceNumber obsolete_seq, bool update_size); + + Status PutBlobValue(const WriteOptions& options, const Slice& key, + const Slice& value, uint64_t expiration, + WriteBatch* batch); + + Status AppendBlob(const std::shared_ptr<BlobFile>& bfile, + const std::string& headerbuf, const Slice& key, + const Slice& value, uint64_t expiration, + std::string* index_entry); + + // find an existing blob log file based on the expiration unix epoch + // if such a file does not exist, return nullptr + Status SelectBlobFileTTL(uint64_t expiration, + std::shared_ptr<BlobFile>* blob_file); + + // find an existing blob log file to append the value to + Status SelectBlobFile(std::shared_ptr<BlobFile>* blob_file); + + std::shared_ptr<BlobFile> FindBlobFileLocked(uint64_t expiration) const; + + // periodic sanity check. Bunch of checks + std::pair<bool, int64_t> SanityCheck(bool aborted); + + // delete files which have been garbage collected and marked + // obsolete. Check whether any snapshots exist which refer to + // the same + std::pair<bool, int64_t> DeleteObsoleteFiles(bool aborted); + + // Major task to garbage collect expired and deleted blobs + std::pair<bool, int64_t> RunGC(bool aborted); + + // periodically check if open blob files and their TTL's has expired + // if expired, close the sequential writer and make the file immutable + std::pair<bool, int64_t> EvictExpiredFiles(bool aborted); + + // if the number of open files, approaches ULIMIT's this + // task will close random readers, which are kept around for + // efficiency + std::pair<bool, int64_t> ReclaimOpenFiles(bool aborted); + + std::pair<bool, int64_t> RemoveTimerQ(TimerQueue* tq, bool aborted); + + // Adds the background tasks to the timer queue + void StartBackgroundTasks(); + + // add a new Blob File + std::shared_ptr<BlobFile> NewBlobFile(const std::string& reason); + + // collect all the blob log files from the blob directory + Status GetAllBlobFiles(std::set<uint64_t>* file_numbers); + + // Open all blob files found in blob_dir. + Status OpenAllBlobFiles(); + + Status GetBlobFileReader(const std::shared_ptr<BlobFile>& blob_file, + std::shared_ptr<RandomAccessFileReader>* reader); + + // hold write mutex on file and call. + // Close the above Random Access reader + void CloseRandomAccessLocked(const std::shared_ptr<BlobFile>& bfile); + + // hold write mutex on file and call + // creates a sequential (append) writer for this blobfile + Status CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile); + + // returns a Writer object for the file. If writer is not + // already present, creates one. Needs Write Mutex to be held + Status CheckOrCreateWriterLocked(const std::shared_ptr<BlobFile>& blob_file, + std::shared_ptr<Writer>* writer); + + // Iterate through keys and values on Blob and write into + // separate file the remaining blobs and delete/update pointers + // in LSM atomically + Status GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr, + GCStats* gcstats); + + // checks if there is no snapshot which is referencing the + // blobs + bool VisibleToActiveSnapshot(const std::shared_ptr<BlobFile>& file); + bool FileDeleteOk_SnapshotCheckLocked(const std::shared_ptr<BlobFile>& bfile); + + void CopyBlobFiles(std::vector<std::shared_ptr<BlobFile>>* bfiles_copy); + + uint64_t EpochNow() { return env_->NowMicros() / 1000000; } + + // Check if inserting a new blob will make DB grow out of space. + // If is_fifo = true, FIFO eviction will be triggered to make room for the + // new blob. If force_evict = true, FIFO eviction will evict blob files + // even eviction will not make enough room for the new blob. + Status CheckSizeAndEvictBlobFiles(uint64_t blob_size, + bool force_evict = false); + + // name of the database directory + std::string dbname_; + + // the base DB + DBImpl* db_impl_; + Env* env_; + + // the options that govern the behavior of Blob Storage + BlobDBOptions bdb_options_; + DBOptions db_options_; + ColumnFamilyOptions cf_options_; + EnvOptions env_options_; + + // Raw pointer of statistic. db_options_ has a std::shared_ptr to hold + // ownership. + Statistics* statistics_; + + // by default this is "blob_dir" under dbname_ + // but can be configured + std::string blob_dir_; + + // pointer to directory + std::unique_ptr<Directory> dir_ent_; + + // Read Write Mutex, which protects all the data structures + // HEAVILY TRAFFICKED + mutable port::RWMutex mutex_; + + // Writers has to hold write_mutex_ before writing. + mutable port::Mutex write_mutex_; + + // counter for blob file number + std::atomic<uint64_t> next_file_number_; + + // entire metadata of all the BLOB files memory + std::map<uint64_t, std::shared_ptr<BlobFile>> blob_files_; + + // epoch or version of the open files. + std::atomic<uint64_t> epoch_of_; + + // opened non-TTL blob file. + std::shared_ptr<BlobFile> open_non_ttl_file_; + + // all the blob files which are currently being appended to based + // on variety of incoming TTL's + std::set<std::shared_ptr<BlobFile>, BlobFileComparatorTTL> open_ttl_files_; + + // Flag to check whether Close() has been called on this DB + bool closed_; + + // timer based queue to execute tasks + TimerQueue tqueue_; + + // number of files opened for random access/GET + // counter is used to monitor and close excess RA files. + std::atomic<uint32_t> open_file_count_; + + // Total size of all live blob files (i.e. exclude obsolete files). + std::atomic<uint64_t> total_blob_size_; + + // total size of SST files. + std::atomic<uint64_t> live_sst_size_; + + // Latest FIFO eviction timestamp + // + // REQUIRES: access with metex_ lock held. + uint64_t fifo_eviction_seq_; + + // The expiration up to which latest FIFO eviction evicts. + // + // REQUIRES: access with metex_ lock held. + uint64_t evict_expiration_up_to_; + + std::list<std::shared_ptr<BlobFile>> obsolete_files_; + + // DeleteObsoleteFiles, DiableFileDeletions and EnableFileDeletions block + // on the mutex to avoid contention. + // + // While DeleteObsoleteFiles hold both mutex_ and delete_file_mutex_, note + // the difference. mutex_ only needs to be held when access the + // data-structure, and delete_file_mutex_ needs to be held the whole time + // during DeleteObsoleteFiles to avoid being run simultaneously with + // DisableFileDeletions. + // + // If both of mutex_ and delete_file_mutex_ needs to be held, it is adviced + // to hold delete_file_mutex_ first to avoid deadlock. + mutable port::Mutex delete_file_mutex_; + + // Each call of DisableFileDeletions will increase disable_file_deletion_ + // by 1. EnableFileDeletions will either decrease the count by 1 or reset + // it to zeor, depending on the force flag. + // + // REQUIRES: access with delete_file_mutex_ held. + int disable_file_deletions_ = 0; + + uint32_t debug_level_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc new file mode 100644 index 00000000..8effe88c --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_impl_filesnapshot.cc @@ -0,0 +1,108 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_db_impl.h" + +#include "util/filename.h" +#include "util/logging.h" +#include "util/mutexlock.h" + +// BlobDBImpl methods to get snapshot of files, e.g. for replication. + +namespace rocksdb { +namespace blob_db { + +Status BlobDBImpl::DisableFileDeletions() { + // Disable base DB file deletions. + Status s = db_impl_->DisableFileDeletions(); + if (!s.ok()) { + return s; + } + + int count = 0; + { + // Hold delete_file_mutex_ to make sure no DeleteObsoleteFiles job + // is running. + MutexLock l(&delete_file_mutex_); + count = ++disable_file_deletions_; + } + + ROCKS_LOG_INFO(db_options_.info_log, + "Disalbed blob file deletions. count: %d", count); + return Status::OK(); +} + +Status BlobDBImpl::EnableFileDeletions(bool force) { + // Enable base DB file deletions. + Status s = db_impl_->EnableFileDeletions(force); + if (!s.ok()) { + return s; + } + + int count = 0; + { + MutexLock l(&delete_file_mutex_); + if (force) { + disable_file_deletions_ = 0; + } else if (disable_file_deletions_ > 0) { + count = --disable_file_deletions_; + } + assert(count >= 0); + } + + ROCKS_LOG_INFO(db_options_.info_log, "Enabled blob file deletions. count: %d", + count); + // Consider trigger DeleteobsoleteFiles once after re-enabled, if we are to + // make DeleteobsoleteFiles re-run interval configuration. + return Status::OK(); +} + +Status BlobDBImpl::GetLiveFiles(std::vector<std::string>& ret, + uint64_t* manifest_file_size, + bool flush_memtable) { + if (!bdb_options_.path_relative) { + return Status::NotSupported( + "Not able to get relative blob file path from absolute blob_dir."); + } + // Hold a lock in the beginning to avoid updates to base DB during the call + ReadLock rl(&mutex_); + Status s = db_->GetLiveFiles(ret, manifest_file_size, flush_memtable); + if (!s.ok()) { + return s; + } + ret.reserve(ret.size() + blob_files_.size()); + for (auto bfile_pair : blob_files_) { + auto blob_file = bfile_pair.second; + // Path should be relative to db_name, but begin with slash. + ret.emplace_back( + BlobFileName("", bdb_options_.blob_dir, blob_file->BlobFileNumber())); + } + return Status::OK(); +} + +void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) { + // Path should be relative to db_name. + assert(bdb_options_.path_relative); + // Hold a lock in the beginning to avoid updates to base DB during the call + ReadLock rl(&mutex_); + db_->GetLiveFilesMetaData(metadata); + for (auto bfile_pair : blob_files_) { + auto blob_file = bfile_pair.second; + LiveFileMetaData filemetadata; + filemetadata.size = static_cast<size_t>(blob_file->GetFileSize()); + // Path should be relative to db_name, but begin with slash. + filemetadata.name = + BlobFileName("", bdb_options_.blob_dir, blob_file->BlobFileNumber()); + auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); + filemetadata.column_family_name = cfh->GetName(); + metadata->emplace_back(filemetadata); + } +} + +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_iterator.h b/src/rocksdb/utilities/blob_db/blob_db_iterator.h new file mode 100644 index 00000000..1565c670 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_iterator.h @@ -0,0 +1,148 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#ifndef ROCKSDB_LITE + +#include "monitoring/statistics.h" +#include "rocksdb/iterator.h" +#include "util/stop_watch.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +using rocksdb::ManagedSnapshot; + +class BlobDBIterator : public Iterator { + public: + BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter, + BlobDBImpl* blob_db, Env* env, Statistics* statistics) + : snapshot_(snapshot), + iter_(iter), + blob_db_(blob_db), + env_(env), + statistics_(statistics) {} + + virtual ~BlobDBIterator() = default; + + bool Valid() const override { + if (!iter_->Valid()) { + return false; + } + return status_.ok(); + } + + Status status() const override { + if (!iter_->status().ok()) { + return iter_->status(); + } + return status_; + } + + void SeekToFirst() override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->SeekToFirst(); + while (UpdateBlobValue()) { + iter_->Next(); + } + } + + void SeekToLast() override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->SeekToLast(); + while (UpdateBlobValue()) { + iter_->Prev(); + } + } + + void Seek(const Slice& target) override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->Seek(target); + while (UpdateBlobValue()) { + iter_->Next(); + } + } + + void SeekForPrev(const Slice& target) override { + StopWatch seek_sw(env_, statistics_, BLOB_DB_SEEK_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_SEEK); + iter_->SeekForPrev(target); + while (UpdateBlobValue()) { + iter_->Prev(); + } + } + + void Next() override { + assert(Valid()); + StopWatch next_sw(env_, statistics_, BLOB_DB_NEXT_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_NEXT); + iter_->Next(); + while (UpdateBlobValue()) { + iter_->Next(); + } + } + + void Prev() override { + assert(Valid()); + StopWatch prev_sw(env_, statistics_, BLOB_DB_PREV_MICROS); + RecordTick(statistics_, BLOB_DB_NUM_PREV); + iter_->Prev(); + while (UpdateBlobValue()) { + iter_->Prev(); + } + } + + Slice key() const override { + assert(Valid()); + return iter_->key(); + } + + Slice value() const override { + assert(Valid()); + if (!iter_->IsBlob()) { + return iter_->value(); + } + return value_; + } + + // Iterator::Refresh() not supported. + + private: + // Return true if caller should continue to next value. + bool UpdateBlobValue() { + TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:1"); + TEST_SYNC_POINT("BlobDBIterator::UpdateBlobValue:Start:2"); + value_.Reset(); + status_ = Status::OK(); + if (iter_->Valid() && iter_->status().ok() && iter_->IsBlob()) { + Status s = blob_db_->GetBlobValue(iter_->key(), iter_->value(), &value_); + if (s.IsNotFound()) { + return true; + } else { + if (!s.ok()) { + status_ = s; + } + return false; + } + } else { + return false; + } + } + + std::unique_ptr<ManagedSnapshot> snapshot_; + std::unique_ptr<ArenaWrappedDBIter> iter_; + BlobDBImpl* blob_db_; + Env* env_; + Statistics* statistics_; + Status status_; + PinnableSlice value_; +}; +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_listener.h b/src/rocksdb/utilities/blob_db/blob_db_listener.h new file mode 100644 index 00000000..f096d238 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_listener.h @@ -0,0 +1,46 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include <atomic> + +#include "rocksdb/listener.h" +#include "util/mutexlock.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { +namespace blob_db { + +class BlobDBListener : public EventListener { + public: + explicit BlobDBListener(BlobDBImpl* blob_db_impl) + : blob_db_impl_(blob_db_impl) {} + + void OnFlushBegin(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->SyncBlobFiles(); + } + + void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + void OnCompactionCompleted(DB* /*db*/, + const CompactionJobInfo& /*info*/) override { + assert(blob_db_impl_ != nullptr); + blob_db_impl_->UpdateLiveSSTSize(); + } + + private: + BlobDBImpl* blob_db_impl_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_db_test.cc b/src/rocksdb/utilities/blob_db/blob_db_test.cc new file mode 100644 index 00000000..afb953df --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_db_test.cc @@ -0,0 +1,1695 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include <algorithm> +#include <chrono> +#include <cstdlib> +#include <map> +#include <memory> +#include <string> +#include <vector> + +#include "db/db_test_util.h" +#include "port/port.h" +#include "rocksdb/utilities/debug.h" +#include "util/cast_util.h" +#include "util/fault_injection_test_env.h" +#include "util/file_util.h" +#include "util/random.h" +#include "util/sst_file_manager_impl.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "utilities/blob_db/blob_db.h" +#include "utilities/blob_db/blob_db_impl.h" +#include "utilities/blob_db/blob_index.h" + +namespace rocksdb { +namespace blob_db { + +class BlobDBTest : public testing::Test { + public: + const int kMaxBlobSize = 1 << 14; + + struct BlobRecord { + std::string key; + std::string value; + uint64_t expiration = 0; + }; + + BlobDBTest() + : dbname_(test::PerThreadDBPath("blob_db_test")), + mock_env_(new MockTimeEnv(Env::Default())), + fault_injection_env_(new FaultInjectionTestEnv(Env::Default())), + blob_db_(nullptr) { + Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); + assert(s.ok()); + } + + ~BlobDBTest() override { + SyncPoint::GetInstance()->ClearAllCallBacks(); + Destroy(); + } + + Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + options.create_if_missing = true; + return BlobDB::Open(options, bdb_options, dbname_, &blob_db_); + } + + void Open(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + ASSERT_OK(TryOpen(bdb_options, options)); + } + + void Reopen(BlobDBOptions bdb_options = BlobDBOptions(), + Options options = Options()) { + assert(blob_db_ != nullptr); + delete blob_db_; + blob_db_ = nullptr; + Open(bdb_options, options); + } + + void Close() { + assert(blob_db_ != nullptr); + delete blob_db_; + blob_db_ = nullptr; + } + + void Destroy() { + if (blob_db_) { + Options options = blob_db_->GetOptions(); + BlobDBOptions bdb_options = blob_db_->GetBlobDBOptions(); + delete blob_db_; + blob_db_ = nullptr; + ASSERT_OK(DestroyBlobDB(dbname_, options, bdb_options)); + } + } + + BlobDBImpl *blob_db_impl() { + return reinterpret_cast<BlobDBImpl *>(blob_db_); + } + + Status Put(const Slice &key, const Slice &value, + std::map<std::string, std::string> *data = nullptr) { + Status s = blob_db_->Put(WriteOptions(), key, value); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; + } + + void Delete(const std::string &key, + std::map<std::string, std::string> *data = nullptr) { + ASSERT_OK(blob_db_->Delete(WriteOptions(), key)); + if (data != nullptr) { + data->erase(key); + } + } + + Status PutWithTTL(const Slice &key, const Slice &value, uint64_t ttl, + std::map<std::string, std::string> *data = nullptr) { + Status s = blob_db_->PutWithTTL(WriteOptions(), key, value, ttl); + if (data != nullptr) { + (*data)[key.ToString()] = value.ToString(); + } + return s; + } + + Status PutUntil(const Slice &key, const Slice &value, uint64_t expiration) { + return blob_db_->PutUntil(WriteOptions(), key, value, expiration); + } + + void PutRandomWithTTL(const std::string &key, uint64_t ttl, Random *rnd, + std::map<std::string, std::string> *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = test::RandomHumanReadableString(rnd, len); + ASSERT_OK( + blob_db_->PutWithTTL(WriteOptions(), Slice(key), Slice(value), ttl)); + if (data != nullptr) { + (*data)[key] = value; + } + } + + void PutRandomUntil(const std::string &key, uint64_t expiration, Random *rnd, + std::map<std::string, std::string> *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = test::RandomHumanReadableString(rnd, len); + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value), + expiration)); + if (data != nullptr) { + (*data)[key] = value; + } + } + + void PutRandom(const std::string &key, Random *rnd, + std::map<std::string, std::string> *data = nullptr) { + PutRandom(blob_db_, key, rnd, data); + } + + void PutRandom(DB *db, const std::string &key, Random *rnd, + std::map<std::string, std::string> *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = test::RandomHumanReadableString(rnd, len); + ASSERT_OK(db->Put(WriteOptions(), Slice(key), Slice(value))); + if (data != nullptr) { + (*data)[key] = value; + } + } + + void PutRandomToWriteBatch( + const std::string &key, Random *rnd, WriteBatch *batch, + std::map<std::string, std::string> *data = nullptr) { + int len = rnd->Next() % kMaxBlobSize + 1; + std::string value = test::RandomHumanReadableString(rnd, len); + ASSERT_OK(batch->Put(key, value)); + if (data != nullptr) { + (*data)[key] = value; + } + } + + // Verify blob db contain expected data and nothing more. + void VerifyDB(const std::map<std::string, std::string> &data) { + VerifyDB(blob_db_, data); + } + + void VerifyDB(DB *db, const std::map<std::string, std::string> &data) { + // Verify normal Get + auto* cfh = db->DefaultColumnFamily(); + for (auto &p : data) { + PinnableSlice value_slice; + ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value_slice)); + ASSERT_EQ(p.second, value_slice.ToString()); + std::string value; + ASSERT_OK(db->Get(ReadOptions(), cfh, p.first, &value)); + ASSERT_EQ(p.second, value); + } + + // Verify iterators + Iterator *iter = db->NewIterator(ReadOptions()); + iter->SeekToFirst(); + for (auto &p : data) { + ASSERT_TRUE(iter->Valid()); + ASSERT_EQ(p.first, iter->key().ToString()); + ASSERT_EQ(p.second, iter->value().ToString()); + iter->Next(); + } + ASSERT_FALSE(iter->Valid()); + ASSERT_OK(iter->status()); + delete iter; + } + + void VerifyBaseDB( + const std::map<std::string, KeyVersion> &expected_versions) { + auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_); + DB *db = blob_db_->GetRootDB(); + const size_t kMaxKeys = 10000; + std::vector<KeyVersion> versions; + GetAllKeyVersions(db, "", "", kMaxKeys, &versions); + ASSERT_EQ(expected_versions.size(), versions.size()); + size_t i = 0; + for (auto &key_version : expected_versions) { + const KeyVersion &expected_version = key_version.second; + ASSERT_EQ(expected_version.user_key, versions[i].user_key); + ASSERT_EQ(expected_version.sequence, versions[i].sequence); + ASSERT_EQ(expected_version.type, versions[i].type); + if (versions[i].type == kTypeValue) { + ASSERT_EQ(expected_version.value, versions[i].value); + } else { + ASSERT_EQ(kTypeBlobIndex, versions[i].type); + PinnableSlice value; + ASSERT_OK(bdb_impl->TEST_GetBlobValue(versions[i].user_key, + versions[i].value, &value)); + ASSERT_EQ(expected_version.value, value.ToString()); + } + i++; + } + } + + void InsertBlobs() { + WriteOptions wo; + std::string value; + + Random rnd(301); + for (size_t i = 0; i < 100000; i++) { + uint64_t ttl = rnd.Next() % 86400; + PutRandomWithTTL("key" + ToString(i % 500), ttl, &rnd, nullptr); + } + + for (size_t i = 0; i < 10; i++) { + Delete("key" + ToString(i % 500)); + } + } + + const std::string dbname_; + std::unique_ptr<MockTimeEnv> mock_env_; + std::unique_ptr<FaultInjectionTestEnv> fault_injection_env_; + BlobDB *blob_db_; +}; // class BlobDBTest + +TEST_F(BlobDBTest, Put) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, PutWithTTL) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map<std::string, std::string> data; + mock_env_->set_current_time(50); + for (size_t i = 0; i < 100; i++) { + uint64_t ttl = rnd.Next() % 100; + PutRandomWithTTL("key" + ToString(i), ttl, &rnd, + (ttl <= 50 ? nullptr : &data)); + } + mock_env_->set_current_time(100); + auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); + VerifyDB(data); +} + +TEST_F(BlobDBTest, PutUntil) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 1000; + bdb_options.min_blob_size = 0; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + std::map<std::string, std::string> data; + mock_env_->set_current_time(50); + for (size_t i = 0; i < 100; i++) { + uint64_t expiration = rnd.Next() % 100 + 50; + PutRandomUntil("key" + ToString(i), expiration, &rnd, + (expiration <= 100 ? nullptr : &data)); + } + mock_env_->set_current_time(100); + auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0])); + GCStats gc_stats; + ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired); + ASSERT_EQ(data.size(), gc_stats.num_keys_relocated); + VerifyDB(data); +} + +TEST_F(BlobDBTest, StackableDBGet) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + for (size_t i = 0; i < 100; i++) { + StackableDB *db = blob_db_; + ColumnFamilyHandle *column_family = db->DefaultColumnFamily(); + std::string key = "key" + ToString(i); + PinnableSlice pinnable_value; + ASSERT_OK(db->Get(ReadOptions(), column_family, key, &pinnable_value)); + std::string string_value; + ASSERT_OK(db->Get(ReadOptions(), column_family, key, &string_value)); + ASSERT_EQ(string_value, pinnable_value.ToString()); + ASSERT_EQ(string_value, data[key]); + } +} + +TEST_F(BlobDBTest, GetExpiration) { + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + mock_env_->set_current_time(100); + Open(bdb_options, options); + Put("key1", "value1"); + PutWithTTL("key2", "value2", 200); + PinnableSlice value; + uint64_t expiration; + ASSERT_OK(blob_db_->Get(ReadOptions(), "key1", &value, &expiration)); + ASSERT_EQ("value1", value.ToString()); + ASSERT_EQ(kNoExpiration, expiration); + ASSERT_OK(blob_db_->Get(ReadOptions(), "key2", &value, &expiration)); + ASSERT_EQ("value2", value.ToString()); + ASSERT_EQ(300 /* = 100 + 200 */, expiration); +} + +TEST_F(BlobDBTest, GetIOError) { + Options options; + options.env = fault_injection_env_.get(); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; // Make sure value write to blob file + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + ColumnFamilyHandle *column_family = blob_db_->DefaultColumnFamily(); + PinnableSlice value; + ASSERT_OK(Put("foo", "bar")); + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + Status s = blob_db_->Get(ReadOptions(), column_family, "foo", &value); + ASSERT_TRUE(s.IsIOError()); + // Reactivate file system to allow test to close DB. + fault_injection_env_->SetFilesystemActive(true); +} + +TEST_F(BlobDBTest, PutIOError) { + Options options; + options.env = fault_injection_env_.get(); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; // Make sure value write to blob file + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + fault_injection_env_->SetFilesystemActive(false, Status::IOError()); + ASSERT_TRUE(Put("foo", "v1").IsIOError()); + fault_injection_env_->SetFilesystemActive(true, Status::IOError()); + ASSERT_OK(Put("bar", "v1")); +} + +TEST_F(BlobDBTest, WriteBatch) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + WriteBatch batch; + for (size_t j = 0; j < 10; j++) { + PutRandomToWriteBatch("key" + ToString(j * 100 + i), &rnd, &batch, &data); + } + blob_db_->Write(WriteOptions(), &batch); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, Delete) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + for (size_t i = 0; i < 100; i += 5) { + Delete("key" + ToString(i), &data); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, DeleteBatch) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd); + } + WriteBatch batch; + for (size_t i = 0; i < 100; i++) { + batch.Delete("key" + ToString(i)); + } + ASSERT_OK(blob_db_->Write(WriteOptions(), &batch)); + // DB should be empty. + VerifyDB({}); +} + +TEST_F(BlobDBTest, Override) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (int i = 0; i < 10000; i++) { + PutRandom("key" + ToString(i), &rnd, nullptr); + } + // override all the keys + for (int i = 0; i < 10000; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + VerifyDB(data); +} + +#ifdef SNAPPY +TEST_F(BlobDBTest, Compression) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + bdb_options.compression = CompressionType::kSnappyCompression; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + PutRandom("put-key" + ToString(i), &rnd, &data); + } + for (int i = 0; i < 100; i++) { + WriteBatch batch; + for (size_t j = 0; j < 10; j++) { + PutRandomToWriteBatch("write-batch-key" + ToString(j * 100 + i), &rnd, + &batch, &data); + } + blob_db_->Write(WriteOptions(), &batch); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, DecompressAfterReopen) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + bdb_options.compression = CompressionType::kSnappyCompression; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + PutRandom("put-key" + ToString(i), &rnd, &data); + } + VerifyDB(data); + bdb_options.compression = CompressionType::kNoCompression; + Reopen(bdb_options); + VerifyDB(data); +} +#endif + +TEST_F(BlobDBTest, MultipleWriters) { + Open(BlobDBOptions()); + + std::vector<port::Thread> workers; + std::vector<std::map<std::string, std::string>> data_set(10); + for (uint32_t i = 0; i < 10; i++) + workers.push_back(port::Thread( + [&](uint32_t id) { + Random rnd(301 + id); + for (int j = 0; j < 100; j++) { + std::string key = "key" + ToString(id) + "_" + ToString(j); + if (id < 5) { + PutRandom(key, &rnd, &data_set[id]); + } else { + WriteBatch batch; + PutRandomToWriteBatch(key, &rnd, &batch, &data_set[id]); + blob_db_->Write(WriteOptions(), &batch); + } + } + }, + i)); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 10; i++) { + workers[i].join(); + data.insert(data_set[i].begin(), data_set[i].end()); + } + VerifyDB(data); +} + +TEST_F(BlobDBTest, GCAfterOverwriteKeys) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + DBImpl *db_impl = static_cast_with_check<DBImpl, DB>(blob_db_->GetBaseDB()); + std::map<std::string, std::string> data; + for (int i = 0; i < 200; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + // Test for data in SST + size_t new_keys = 0; + for (int i = 0; i < 100; i++) { + if (rnd.Next() % 2 == 1) { + new_keys++; + PutRandom("key" + ToString(i), &rnd, &data); + } + } + db_impl->TEST_FlushMemTable(true /*wait*/); + // Test for data in memtable + for (int i = 100; i < 200; i++) { + if (rnd.Next() % 2 == 1) { + new_keys++; + PutRandom("key" + ToString(i), &rnd, &data); + } + } + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(200, gc_stats.blob_count); + ASSERT_EQ(0, gc_stats.num_keys_expired); + ASSERT_EQ(200 - new_keys, gc_stats.num_keys_relocated); + VerifyDB(data); +} + +TEST_F(BlobDBTest, GCRelocateKeyWhileOverwriting) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v1")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", + "BlobDBImpl::PutUntil:Start"}, + {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::GCFileAndUpdateLSM:BeforeRelocate"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto writer = port::Thread( + [this]() { ASSERT_OK(blob_db_->Put(WriteOptions(), "foo", "v2")); }); + + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(0, gc_stats.num_keys_expired); + ASSERT_EQ(1, gc_stats.num_keys_overwritten); + ASSERT_EQ(0, gc_stats.num_keys_relocated); + writer.join(); + VerifyDB({{"foo", "v2"}}); +} + +TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { + Random rnd(301); + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options, options); + mock_env_->set_current_time(100); + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + mock_env_->set_current_time(300); + + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", + "BlobDBImpl::PutUntil:Start"}, + {"BlobDBImpl::PutUntil:Finish", + "BlobDBImpl::GCFileAndUpdateLSM:BeforeDelete"}}); + SyncPoint::GetInstance()->EnableProcessing(); + + auto writer = port::Thread([this]() { + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v2", 400)); + }); + + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(1, gc_stats.num_keys_expired); + ASSERT_EQ(0, gc_stats.num_keys_relocated); + writer.join(); + VerifyDB({{"foo", "v2"}}); +} + +TEST_F(BlobDBTest, NewFileGeneratedFromGCShouldMarkAsImmutable) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + ASSERT_OK(Put("foo", "bar")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + auto blob_file1 = blob_files[0]; + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file1)); + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(blob_file1, &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(1, gc_stats.num_keys_relocated); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(blob_file1, blob_files[0]); + ASSERT_TRUE(blob_files[1]->Immutable()); +} + +// This test is no longer valid since we now return an error when we go +// over the configured max_db_size. +// The test needs to be re-written later in such a way that writes continue +// after a GC happens. +TEST_F(BlobDBTest, DISABLED_GCOldestSimpleBlobFileWhenOutOfSpace) { + // Use mock env to stop wall clock. + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.max_db_size = 100; + bdb_options.blob_file_size = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key_with_ttl", value, 60)); + for (int i = 0; i < 10; i++) { + ASSERT_OK(blob_db_->Put(WriteOptions(), "key" + ToString(i), value)); + } + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(11, blob_files.size()); + ASSERT_TRUE(blob_files[0]->HasTTL()); + ASSERT_TRUE(blob_files[0]->Immutable()); + for (int i = 1; i <= 10; i++) { + ASSERT_FALSE(blob_files[i]->HasTTL()); + if (i < 10) { + ASSERT_TRUE(blob_files[i]->Immutable()); + } + } + blob_db_impl()->TEST_RunGC(); + // The oldest simple blob file (i.e. blob_files[1]) has been selected for GC. + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(1, obsolete_files.size()); + ASSERT_EQ(blob_files[1]->BlobFileNumber(), + obsolete_files[0]->BlobFileNumber()); +} + +TEST_F(BlobDBTest, ReadWhileGC) { + // run the same test for Get(), MultiGet() and Iterator each. + for (int i = 0; i < 2; i++) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + blob_db_->Put(WriteOptions(), "foo", "bar"); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + std::shared_ptr<BlobFile> bfile = blob_files[0]; + uint64_t bfile_number = bfile->BlobFileNumber(); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + + switch (i) { + case 0: + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBImpl::Get:AfterIndexEntryGet:1", + "BlobDBTest::ReadWhileGC:1"}, + {"BlobDBTest::ReadWhileGC:2", + "BlobDBImpl::Get:AfterIndexEntryGet:2"}}); + break; + case 1: + SyncPoint::GetInstance()->LoadDependency( + {{"BlobDBIterator::UpdateBlobValue:Start:1", + "BlobDBTest::ReadWhileGC:1"}, + {"BlobDBTest::ReadWhileGC:2", + "BlobDBIterator::UpdateBlobValue:Start:2"}}); + break; + } + SyncPoint::GetInstance()->EnableProcessing(); + + auto reader = port::Thread([this, i]() { + std::string value; + std::vector<std::string> values; + std::vector<Status> statuses; + switch (i) { + case 0: + ASSERT_OK(blob_db_->Get(ReadOptions(), "foo", &value)); + ASSERT_EQ("bar", value); + break; + case 1: + // VerifyDB use iterator to scan the DB. + VerifyDB({{"foo", "bar"}}); + break; + } + }); + + TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:1"); + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_EQ(1, gc_stats.blob_count); + ASSERT_EQ(1, gc_stats.num_keys_relocated); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + // The file shouln't be deleted + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(bfile_number, blob_files[0]->BlobFileNumber()); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(1, obsolete_files.size()); + ASSERT_EQ(bfile_number, obsolete_files[0]->BlobFileNumber()); + TEST_SYNC_POINT("BlobDBTest::ReadWhileGC:2"); + reader.join(); + SyncPoint::GetInstance()->DisableProcessing(); + + // The file is deleted this time + blob_db_impl()->TEST_DeleteObsoleteFiles(); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_NE(bfile_number, blob_files[0]->BlobFileNumber()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB({{"foo", "bar"}}); + Destroy(); + } +} + +TEST_F(BlobDBTest, SstFileManager) { + // run the same test for Get(), MultiGet() and Iterator each. + std::shared_ptr<SstFileManager> sst_file_manager( + NewSstFileManager(mock_env_.get())); + sst_file_manager->SetDeleteRateBytesPerSecond(1); + SstFileManagerImpl *sfm = + static_cast<SstFileManagerImpl *>(sst_file_manager.get()); + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + Options db_options; + + int files_deleted_directly = 0; + int files_scheduled_to_delete = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", + [&](void * /*arg*/) { files_scheduled_to_delete++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteFile", + [&](void * /*arg*/) { files_deleted_directly++; }); + SyncPoint::GetInstance()->EnableProcessing(); + db_options.sst_file_manager = sst_file_manager; + + Open(bdb_options, db_options); + + // Create one obselete file and clean it. + blob_db_->Put(WriteOptions(), "foo", "bar"); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + std::shared_ptr<BlobFile> bfile = blob_files[0]; + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + + // Even if SSTFileManager is not set, DB is creating a dummy one. + ASSERT_EQ(1, files_scheduled_to_delete); + ASSERT_EQ(0, files_deleted_directly); + Destroy(); + // Make sure that DestroyBlobDB() also goes through delete scheduler. + ASSERT_GE(files_scheduled_to_delete, 2); + // Due to a timing issue, the WAL may or may not be deleted directly. The + // blob file is first scheduled, followed by WAL. If the background trash + // thread does not wake up on time, the WAL file will be directly + // deleted as the trash size will be > DB size + ASSERT_LE(files_deleted_directly, 1); + SyncPoint::GetInstance()->DisableProcessing(); + sfm->WaitForEmptyTrash(); +} + +TEST_F(BlobDBTest, SstFileManagerRestart) { + int files_deleted_directly = 0; + int files_scheduled_to_delete = 0; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "SstFileManagerImpl::ScheduleFileDeletion", + [&](void * /*arg*/) { files_scheduled_to_delete++; }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DeleteScheduler::DeleteFile", + [&](void * /*arg*/) { files_deleted_directly++; }); + + // run the same test for Get(), MultiGet() and Iterator each. + std::shared_ptr<SstFileManager> sst_file_manager( + NewSstFileManager(mock_env_.get())); + sst_file_manager->SetDeleteRateBytesPerSecond(1); + SstFileManagerImpl *sfm = + static_cast<SstFileManagerImpl *>(sst_file_manager.get()); + + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + Options db_options; + + SyncPoint::GetInstance()->EnableProcessing(); + db_options.sst_file_manager = sst_file_manager; + + Open(bdb_options, db_options); + std::string blob_dir = blob_db_impl()->TEST_blob_dir(); + blob_db_->Put(WriteOptions(), "foo", "bar"); + Close(); + + // Create 3 dummy trash files under the blob_dir + CreateFile(db_options.env, blob_dir + "/000666.blob.trash", "", false); + CreateFile(db_options.env, blob_dir + "/000888.blob.trash", "", true); + CreateFile(db_options.env, blob_dir + "/something_not_match.trash", "", + false); + + // Make sure that reopening the DB rescan the existing trash files + Open(bdb_options, db_options); + ASSERT_GE(files_scheduled_to_delete, 3); + // Depending on timing, the WAL file may or may not be directly deleted + ASSERT_LE(files_deleted_directly, 1); + + sfm->WaitForEmptyTrash(); + + // There should be exact one file under the blob dir now. + std::vector<std::string> all_files; + ASSERT_OK(db_options.env->GetChildren(blob_dir, &all_files)); + int nfiles = 0; + for (const auto &f : all_files) { + assert(!f.empty()); + if (f[0] == '.') { + continue; + } + nfiles++; + } + ASSERT_EQ(nfiles, 1); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(BlobDBTest, SnapshotAndGarbageCollection) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + // i = when to take snapshot + for (int i = 0; i < 4; i++) { + for (bool delete_key : {true, false}) { + const Snapshot *snapshot = nullptr; + Destroy(); + Open(bdb_options); + // First file + ASSERT_OK(Put("key1", "value")); + if (i == 0) { + snapshot = blob_db_->GetSnapshot(); + } + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + // Second file + ASSERT_OK(Put("key2", "value")); + if (i == 1) { + snapshot = blob_db_->GetSnapshot(); + } + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + auto bfile = blob_files[1]; + ASSERT_FALSE(bfile->Immutable()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(bfile)); + // Third file + ASSERT_OK(Put("key3", "value")); + if (i == 2) { + snapshot = blob_db_->GetSnapshot(); + } + if (delete_key) { + Delete("key2"); + } + GCStats gc_stats; + ASSERT_OK(blob_db_impl()->TEST_GCFileAndUpdateLSM(bfile, &gc_stats)); + ASSERT_TRUE(bfile->Obsolete()); + ASSERT_EQ(1, gc_stats.blob_count); + if (delete_key) { + ASSERT_EQ(0, gc_stats.num_keys_relocated); + } else { + ASSERT_EQ(1, gc_stats.num_keys_relocated); + } + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), + bfile->GetObsoleteSequence()); + if (i == 3) { + snapshot = blob_db_->GetSnapshot(); + } + size_t num_files = delete_key ? 3 : 4; + ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + if (i == 3) { + // The snapshot shouldn't see data in bfile + ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + } else { + // The snapshot will see data in bfile, so the file shouldn't be deleted + ASSERT_EQ(num_files, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_->ReleaseSnapshot(snapshot); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(num_files - 1, blob_db_impl()->TEST_GetBlobFiles().size()); + } + } + } +} + +TEST_F(BlobDBTest, ColumnFamilyNotSupported) { + Options options; + options.env = mock_env_.get(); + mock_env_->set_current_time(0); + Open(BlobDBOptions(), options); + ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily(); + ColumnFamilyHandle *handle = nullptr; + std::string value; + std::vector<std::string> values; + // The call simply pass through to base db. It should succeed. + ASSERT_OK( + blob_db_->CreateColumnFamily(ColumnFamilyOptions(), "foo", &handle)); + ASSERT_TRUE(blob_db_->Put(WriteOptions(), handle, "k", "v").IsNotSupported()); + ASSERT_TRUE(blob_db_->PutWithTTL(WriteOptions(), handle, "k", "v", 60) + .IsNotSupported()); + ASSERT_TRUE(blob_db_->PutUntil(WriteOptions(), handle, "k", "v", 100) + .IsNotSupported()); + WriteBatch batch; + batch.Put("k1", "v1"); + batch.Put(handle, "k2", "v2"); + ASSERT_TRUE(blob_db_->Write(WriteOptions(), &batch).IsNotSupported()); + ASSERT_TRUE(blob_db_->Get(ReadOptions(), "k1", &value).IsNotFound()); + ASSERT_TRUE( + blob_db_->Get(ReadOptions(), handle, "k", &value).IsNotSupported()); + auto statuses = blob_db_->MultiGet(ReadOptions(), {default_handle, handle}, + {"k1", "k2"}, &values); + ASSERT_EQ(2, statuses.size()); + ASSERT_TRUE(statuses[0].IsNotSupported()); + ASSERT_TRUE(statuses[1].IsNotSupported()); + ASSERT_EQ(nullptr, blob_db_->NewIterator(ReadOptions(), handle)); + delete handle; +} + +TEST_F(BlobDBTest, GetLiveFilesMetaData) { + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.blob_dir = "blob_dir"; + bdb_options.path_relative = true; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (size_t i = 0; i < 100; i++) { + PutRandom("key" + ToString(i), &rnd, &data); + } + std::vector<LiveFileMetaData> metadata; + blob_db_->GetLiveFilesMetaData(&metadata); + ASSERT_EQ(1U, metadata.size()); + // Path should be relative to db_name, but begin with slash. + std::string filename = "/blob_dir/000001.blob"; + ASSERT_EQ(filename, metadata[0].name); + ASSERT_EQ("default", metadata[0].column_family_name); + std::vector<std::string> livefile; + uint64_t mfs; + ASSERT_OK(blob_db_->GetLiveFiles(livefile, &mfs, false)); + ASSERT_EQ(4U, livefile.size()); + ASSERT_EQ(filename, livefile[3]); + VerifyDB(data); +} + +TEST_F(BlobDBTest, MigrateFromPlainRocksDB) { + constexpr size_t kNumKey = 20; + constexpr size_t kNumIteration = 10; + Random rnd(301); + std::map<std::string, std::string> data; + std::vector<bool> is_blob(kNumKey, false); + + // Write to plain rocksdb. + Options options; + options.create_if_missing = true; + DB *db = nullptr; + ASSERT_OK(DB::Open(options, dbname_, &db)); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + ToString(key_index); + PutRandom(db, key, &rnd, &data); + } + VerifyDB(db, data); + delete db; + db = nullptr; + + // Open as blob db. Verify it can read existing data. + Open(); + VerifyDB(blob_db_, data); + for (size_t i = 0; i < kNumIteration; i++) { + auto key_index = rnd.Next() % kNumKey; + std::string key = "key" + ToString(key_index); + is_blob[key_index] = true; + PutRandom(blob_db_, key, &rnd, &data); + } + VerifyDB(blob_db_, data); + delete blob_db_; + blob_db_ = nullptr; + + // Verify plain db return error for keys written by blob db. + ASSERT_OK(DB::Open(options, dbname_, &db)); + std::string value; + for (size_t i = 0; i < kNumKey; i++) { + std::string key = "key" + ToString(i); + Status s = db->Get(ReadOptions(), key, &value); + if (data.count(key) == 0) { + ASSERT_TRUE(s.IsNotFound()); + } else if (is_blob[i]) { + ASSERT_TRUE(s.IsNotSupported()); + } else { + ASSERT_OK(s); + ASSERT_EQ(data[key], value); + } + } + delete db; +} + +// Test to verify that a NoSpace IOError Status is returned on reaching +// max_db_size limit. +TEST_F(BlobDBTest, OutOfSpace) { + // Use mock env to stop wall clock. + Options options; + options.env = mock_env_.get(); + BlobDBOptions bdb_options; + bdb_options.max_db_size = 200; + bdb_options.is_fifo = false; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + // Each stored blob has an overhead of about 42 bytes currently. + // So a small key + a 100 byte blob should take up ~150 bytes in the db. + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 60)); + + // Putting another blob should fail as ading it would exceed the max_db_size + // limit. + Status s = blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60); + ASSERT_TRUE(s.IsIOError()); + ASSERT_TRUE(s.IsNoSpace()); +} + +TEST_F(BlobDBTest, FIFOEviction) { + BlobDBOptions bdb_options; + bdb_options.max_db_size = 200; + bdb_options.blob_file_size = 100; + bdb_options.is_fifo = true; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + std::atomic<int> evict_count{0}; + SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictOldestBlobFile:Evicted", + [&](void *) { evict_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + // Each stored blob has an overhead of 32 bytes currently. + // So a 100 byte blob should take up 132 bytes. + std::string value(100, 'v'); + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key1", value, 10)); + VerifyDB({{"key1", value}}); + + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + + // Adding another 100 bytes blob would take the total size to 264 bytes + // (2*132). max_db_size will be exceeded + // than max_db_size and trigger FIFO eviction. + ASSERT_OK(blob_db_->PutWithTTL(WriteOptions(), "key2", value, 60)); + ASSERT_EQ(1, evict_count); + // key1 will exist until corresponding file be deleted. + VerifyDB({{"key1", value}, {"key2", value}}); + + // Adding another 100 bytes blob without TTL. + ASSERT_OK(blob_db_->Put(WriteOptions(), "key3", value)); + ASSERT_EQ(2, evict_count); + // key1 and key2 will exist until corresponding file be deleted. + VerifyDB({{"key1", value}, {"key2", value}, {"key3", value}}); + + // The fourth blob file, without TTL. + ASSERT_OK(blob_db_->Put(WriteOptions(), "key4", value)); + ASSERT_EQ(3, evict_count); + VerifyDB( + {{"key1", value}, {"key2", value}, {"key3", value}, {"key4", value}}); + + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(4, blob_files.size()); + ASSERT_TRUE(blob_files[0]->Obsolete()); + ASSERT_TRUE(blob_files[1]->Obsolete()); + ASSERT_TRUE(blob_files[2]->Obsolete()); + ASSERT_FALSE(blob_files[3]->Obsolete()); + auto obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_EQ(3, obsolete_files.size()); + ASSERT_EQ(blob_files[0], obsolete_files[0]); + ASSERT_EQ(blob_files[1], obsolete_files[1]); + ASSERT_EQ(blob_files[2], obsolete_files[2]); + + blob_db_impl()->TEST_DeleteObsoleteFiles(); + obsolete_files = blob_db_impl()->TEST_GetObsoleteFiles(); + ASSERT_TRUE(obsolete_files.empty()); + VerifyDB({{"key4", value}}); +} + +TEST_F(BlobDBTest, FIFOEviction_NoOldestFileToEvict) { + Options options; + BlobDBOptions bdb_options; + bdb_options.max_db_size = 1000; + bdb_options.blob_file_size = 5000; + bdb_options.is_fifo = true; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + + std::atomic<int> evict_count{0}; + SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictOldestBlobFile:Evicted", + [&](void *) { evict_count++; }); + SyncPoint::GetInstance()->EnableProcessing(); + + std::string value(2000, 'v'); + ASSERT_TRUE(Put("foo", std::string(2000, 'v')).IsNoSpace()); + ASSERT_EQ(0, evict_count); +} + +TEST_F(BlobDBTest, FIFOEviction_NoEnoughBlobFilesToEvict) { + BlobDBOptions bdb_options; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + options.disable_auto_compactions = true; + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + Open(bdb_options, options); + + ASSERT_EQ(0, blob_db_impl()->TEST_live_sst_size()); + std::string small_value(50, 'v'); + std::map<std::string, std::string> data; + // Insert some data into LSM tree to make sure FIFO eviction take SST + // file size into account. + for (int i = 0; i < 1000; i++) { + ASSERT_OK(Put("key" + ToString(i), small_value, &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = 0; + ASSERT_TRUE(blob_db_->GetIntProperty(DB::Properties::kTotalSstFilesSize, + &live_sst_size)); + ASSERT_TRUE(live_sst_size > 0); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + bdb_options.max_db_size = live_sst_size + 2000; + Reopen(bdb_options, options); + ASSERT_EQ(live_sst_size, blob_db_impl()->TEST_live_sst_size()); + + std::string value_1k(1000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", value_1k, 60, &data)); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); + // large_key2 evicts large_key1 + ASSERT_OK(PutWithTTL("large_key2", value_1k, 60, &data)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + data.erase("large_key1"); + VerifyDB(data); + // large_key3 get no enough space even after evicting large_key2, so it + // instead return no space error. + std::string value_2k(2000, 'v'); + ASSERT_TRUE(PutWithTTL("large_key3", value_2k, 60).IsNoSpace()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + // Verify large_key2 still exists. + VerifyDB(data); +} + +// Test flush or compaction will trigger FIFO eviction since they update +// total SST file size. +TEST_F(BlobDBTest, FIFOEviction_TriggerOnSSTSizeChange) { + BlobDBOptions bdb_options; + bdb_options.max_db_size = 1000; + bdb_options.is_fifo = true; + bdb_options.min_blob_size = 100; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.compression = kNoCompression; + Open(bdb_options, options); + + std::string value(800, 'v'); + ASSERT_OK(PutWithTTL("large_key", value, 60)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB({{"large_key", value}}); + + // Insert some small keys and flush to bring DB out of space. + std::map<std::string, std::string> data; + for (int i = 0; i < 10; i++) { + ASSERT_OK(Put("key" + ToString(i), "v", &data)); + } + ASSERT_OK(blob_db_->Flush(FlushOptions())); + + // Verify large_key is deleted by FIFO eviction. + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + VerifyDB(data); +} + +TEST_F(BlobDBTest, InlineSmallValues) { + constexpr uint64_t kMaxExpiration = 1000; + Random rnd(301); + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = kMaxExpiration; + bdb_options.min_blob_size = 100; + bdb_options.blob_file_size = 256 * 1000 * 1000; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + mock_env_->set_current_time(0); + Open(bdb_options, options); + std::map<std::string, std::string> data; + std::map<std::string, KeyVersion> versions; + for (size_t i = 0; i < 1000; i++) { + bool is_small_value = rnd.Next() % 2; + bool has_ttl = rnd.Next() % 2; + uint64_t expiration = rnd.Next() % kMaxExpiration; + int len = is_small_value ? 50 : 200; + std::string key = "key" + ToString(i); + std::string value = test::RandomHumanReadableString(&rnd, len); + std::string blob_index; + data[key] = value; + SequenceNumber sequence = blob_db_->GetLatestSequenceNumber() + 1; + if (!has_ttl) { + ASSERT_OK(blob_db_->Put(WriteOptions(), key, value)); + } else { + ASSERT_OK(blob_db_->PutUntil(WriteOptions(), key, value, expiration)); + } + ASSERT_EQ(blob_db_->GetLatestSequenceNumber(), sequence); + versions[key] = + KeyVersion(key, value, sequence, + (is_small_value && !has_ttl) ? kTypeValue : kTypeBlobIndex); + } + VerifyDB(data); + VerifyBaseDB(versions); + auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_); + auto blob_files = bdb_impl->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + std::shared_ptr<BlobFile> non_ttl_file; + std::shared_ptr<BlobFile> ttl_file; + if (blob_files[0]->HasTTL()) { + ttl_file = blob_files[0]; + non_ttl_file = blob_files[1]; + } else { + non_ttl_file = blob_files[0]; + ttl_file = blob_files[1]; + } + ASSERT_FALSE(non_ttl_file->HasTTL()); + ASSERT_TRUE(ttl_file->HasTTL()); +} + +TEST_F(BlobDBTest, CompactionFilterNotSupported) { + class TestCompactionFilter : public CompactionFilter { + const char *Name() const override { return "TestCompactionFilter"; } + }; + class TestCompactionFilterFactory : public CompactionFilterFactory { + const char *Name() const override { return "TestCompactionFilterFactory"; } + std::unique_ptr<CompactionFilter> CreateCompactionFilter( + const CompactionFilter::Context & /*context*/) override { + return std::unique_ptr<CompactionFilter>(new TestCompactionFilter()); + } + }; + for (int i = 0; i < 2; i++) { + Options options; + if (i == 0) { + options.compaction_filter = new TestCompactionFilter(); + } else { + options.compaction_filter_factory.reset( + new TestCompactionFilterFactory()); + } + ASSERT_TRUE(TryOpen(BlobDBOptions(), options).IsNotSupported()); + delete options.compaction_filter; + } +} + +// Test comapction filter should remove any expired blob index. +TEST_F(BlobDBTest, FilterExpiredBlobIndex) { + constexpr size_t kNumKeys = 100; + constexpr size_t kNumPuts = 1000; + constexpr uint64_t kMaxExpiration = 1000; + constexpr uint64_t kCompactTime = 500; + constexpr uint64_t kMinBlobSize = 100; + Random rnd(301); + mock_env_->set_current_time(0); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = kMinBlobSize; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + Open(bdb_options, options); + + std::map<std::string, std::string> data; + std::map<std::string, std::string> data_after_compact; + for (size_t i = 0; i < kNumPuts; i++) { + bool is_small_value = rnd.Next() % 2; + bool has_ttl = rnd.Next() % 2; + uint64_t expiration = rnd.Next() % kMaxExpiration; + int len = is_small_value ? 10 : 200; + std::string key = "key" + ToString(rnd.Next() % kNumKeys); + std::string value = test::RandomHumanReadableString(&rnd, len); + if (!has_ttl) { + if (is_small_value) { + std::string blob_entry; + BlobIndex::EncodeInlinedTTL(&blob_entry, expiration, value); + // Fake blob index with TTL. See what it will do. + ASSERT_GT(kMinBlobSize, blob_entry.size()); + value = blob_entry; + } + ASSERT_OK(Put(key, value)); + data_after_compact[key] = value; + } else { + ASSERT_OK(PutUntil(key, value, expiration)); + if (expiration <= kCompactTime) { + data_after_compact.erase(key); + } else { + data_after_compact[key] = value; + } + } + data[key] = value; + } + VerifyDB(data); + + mock_env_->set_current_time(kCompactTime); + // Take a snapshot before compaction. Make sure expired blob indexes is + // filtered regardless of snapshot. + const Snapshot *snapshot = blob_db_->GetSnapshot(); + // Issue manual compaction to trigger compaction filter. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), + blob_db_->DefaultColumnFamily(), nullptr, + nullptr)); + blob_db_->ReleaseSnapshot(snapshot); + // Verify expired blob index are filtered. + std::vector<KeyVersion> versions; + const size_t kMaxKeys = 10000; + GetAllKeyVersions(blob_db_, "", "", kMaxKeys, &versions); + ASSERT_EQ(data_after_compact.size(), versions.size()); + for (auto &version : versions) { + ASSERT_TRUE(data_after_compact.count(version.user_key) > 0); + } + VerifyDB(data_after_compact); +} + +// Test compaction filter should remove any blob index where corresponding +// blob file has been removed (either by FIFO or garbage collection). +TEST_F(BlobDBTest, FilterFileNotAvailable) { + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + ASSERT_OK(Put("foo", "v1")); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + ASSERT_EQ(1, blob_files[0]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[0])); + + ASSERT_OK(Put("bar", "v2")); + blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(2, blob_files.size()); + ASSERT_EQ(2, blob_files[1]->BlobFileNumber()); + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_files[1])); + + const size_t kMaxKeys = 10000; + + DB *base_db = blob_db_->GetRootDB(); + std::vector<KeyVersion> versions; + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(2, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + ASSERT_EQ("foo", versions[1].user_key); + VerifyDB({{"bar", "v2"}, {"foo", "v1"}}); + + // Remove the first blob file and compact. foo should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[0]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(1, versions.size()); + ASSERT_EQ("bar", versions[0].user_key); + VerifyDB({{"bar", "v2"}}); + + // Remove the second blob file and compact. bar should be remove from base db. + blob_db_impl()->TEST_ObsoleteBlobFile(blob_files[1]); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_OK(GetAllKeyVersions(base_db, "", "", kMaxKeys, &versions)); + ASSERT_EQ(0, versions.size()); + VerifyDB({}); +} + +// Test compaction filter should filter any inlined TTL keys that would have +// been dropped by last FIFO eviction if they are store out-of-line. +TEST_F(BlobDBTest, FilterForFIFOEviction) { + Random rnd(215); + BlobDBOptions bdb_options; + bdb_options.min_blob_size = 100; + bdb_options.ttl_range_secs = 60; + bdb_options.max_db_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + // Use mock env to stop wall clock. + mock_env_->set_current_time(0); + options.env = mock_env_.get(); + auto statistics = CreateDBStatistics(); + options.statistics = statistics; + options.disable_auto_compactions = true; + Open(bdb_options, options); + + std::map<std::string, std::string> data; + std::map<std::string, std::string> data_after_compact; + // Insert some small values that will be inlined. + for (int i = 0; i < 1000; i++) { + std::string key = "key" + ToString(i); + std::string value = test::RandomHumanReadableString(&rnd, 50); + uint64_t ttl = rnd.Next() % 120 + 1; + ASSERT_OK(PutWithTTL(key, value, ttl, &data)); + if (ttl >= 60) { + data_after_compact[key] = value; + } + } + uint64_t num_keys_to_evict = data.size() - data_after_compact.size(); + ASSERT_OK(blob_db_->Flush(FlushOptions())); + uint64_t live_sst_size = blob_db_impl()->TEST_live_sst_size(); + ASSERT_GT(live_sst_size, 0); + VerifyDB(data); + + bdb_options.max_db_size = live_sst_size + 30000; + bdb_options.is_fifo = true; + Reopen(bdb_options, options); + VerifyDB(data); + + // Put two large values, each on a different blob file. + std::string large_value(10000, 'v'); + ASSERT_OK(PutWithTTL("large_key1", large_value, 90)); + ASSERT_OK(PutWithTTL("large_key2", large_value, 150)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + data["large_key1"] = large_value; + data["large_key2"] = large_value; + VerifyDB(data); + + // Put a third large value which will bring the DB out of space. + // FIFO eviction will evict the file of large_key1. + ASSERT_OK(PutWithTTL("large_key3", large_value, 150)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(2, blob_db_impl()->TEST_GetBlobFiles().size()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data.erase("large_key1"); + data["large_key3"] = large_value; + VerifyDB(data); + + // Putting some more small values. These values shouldn't be evicted by + // compaction filter since they are inserted after FIFO eviction. + ASSERT_OK(PutWithTTL("foo", "v", 30, &data_after_compact)); + ASSERT_OK(PutWithTTL("bar", "v", 30, &data_after_compact)); + + // FIFO eviction doesn't trigger again since there enough room for the flush. + ASSERT_OK(blob_db_->Flush(FlushOptions())); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + + // Manual compact and check if compaction filter evict those keys with + // expiration < 60. + ASSERT_OK(blob_db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + // All keys with expiration < 60, plus large_key1 is filtered by + // compaction filter. + ASSERT_EQ(num_keys_to_evict + 1, + statistics->getTickerCount(BLOB_DB_BLOB_INDEX_EVICTED_COUNT)); + ASSERT_EQ(1, statistics->getTickerCount(BLOB_DB_FIFO_NUM_FILES_EVICTED)); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + data_after_compact["large_key2"] = large_value; + data_after_compact["large_key3"] = large_value; + VerifyDB(data_after_compact); +} + +// File should be evicted after expiration. +TEST_F(BlobDBTest, EvictExpiredFile) { + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = true; + Options options; + options.env = mock_env_.get(); + Open(bdb_options, options); + mock_env_->set_current_time(50); + std::map<std::string, std::string> data; + ASSERT_OK(PutWithTTL("foo", "bar", 100, &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_FALSE(blob_file->Immutable()); + ASSERT_FALSE(blob_file->Obsolete()); + VerifyDB(data); + mock_env_->set_current_time(250); + // The key should expired now. + blob_db_impl()->TEST_EvictExpiredFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + ASSERT_TRUE(blob_file->Immutable()); + ASSERT_TRUE(blob_file->Obsolete()); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); + // Make sure we don't return garbage value after blob file being evicted, + // but the blob index still exists in the LSM tree. + std::string val = ""; + ASSERT_TRUE(blob_db_->Get(ReadOptions(), "foo", &val).IsNotFound()); + ASSERT_EQ("", val); +} + +TEST_F(BlobDBTest, DisableFileDeletions) { + BlobDBOptions bdb_options; + bdb_options.disable_background_tasks = true; + Open(bdb_options); + std::map<std::string, std::string> data; + for (bool force : {true, false}) { + ASSERT_OK(Put("foo", "v", &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_OK(blob_db_impl()->TEST_CloseBlobFile(blob_file)); + blob_db_impl()->TEST_ObsoleteBlobFile(blob_file); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + // Call DisableFileDeletions twice. + ASSERT_OK(blob_db_->DisableFileDeletions()); + ASSERT_OK(blob_db_->DisableFileDeletions()); + // File deletions should be disabled. + blob_db_impl()->TEST_DeleteObsoleteFiles(); + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB(data); + // Enable file deletions once. If force=true, file deletion is enabled. + // Otherwise it needs to enable it for a second time. + ASSERT_OK(blob_db_->EnableFileDeletions(force)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + if (!force) { + ASSERT_EQ(1, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(1, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB(data); + // Call EnableFileDeletions a second time. + ASSERT_OK(blob_db_->EnableFileDeletions(false)); + blob_db_impl()->TEST_DeleteObsoleteFiles(); + } + // Regardless of value of `force`, file should be deleted by now. + ASSERT_EQ(0, blob_db_impl()->TEST_GetBlobFiles().size()); + ASSERT_EQ(0, blob_db_impl()->TEST_GetObsoleteFiles().size()); + VerifyDB({}); + } +} + +TEST_F(BlobDBTest, ShutdownWait) { + BlobDBOptions bdb_options; + bdb_options.ttl_range_secs = 100; + bdb_options.min_blob_size = 0; + bdb_options.disable_background_tasks = false; + Options options; + options.env = mock_env_.get(); + + SyncPoint::GetInstance()->LoadDependency({ + {"BlobDBImpl::EvictExpiredFiles:0", "BlobDBTest.ShutdownWait:0"}, + {"BlobDBTest.ShutdownWait:1", "BlobDBImpl::EvictExpiredFiles:1"}, + {"BlobDBImpl::EvictExpiredFiles:2", "BlobDBTest.ShutdownWait:2"}, + {"BlobDBTest.ShutdownWait:3", "BlobDBImpl::EvictExpiredFiles:3"}, + }); + // Force all tasks to be scheduled immediately. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "TimeQueue::Add:item.end", [&](void *arg) { + std::chrono::steady_clock::time_point *tp = + static_cast<std::chrono::steady_clock::time_point *>(arg); + *tp = + std::chrono::steady_clock::now() - std::chrono::milliseconds(10000); + }); + + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "BlobDBImpl::EvictExpiredFiles:cb", [&](void * /*arg*/) { + // Sleep 3 ms to increase the chance of data race. + // We've synced up the code so that EvictExpiredFiles() + // is called concurrently with ~BlobDBImpl(). + // ~BlobDBImpl() is supposed to wait for all background + // task to shutdown before doing anything else. In order + // to use the same test to reproduce a bug of the waiting + // logic, we wait a little bit here, so that TSAN can + // catch the data race. + // We should improve the test if we find a better way. + Env::Default()->SleepForMicroseconds(3000); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + + Open(bdb_options, options); + mock_env_->set_current_time(50); + std::map<std::string, std::string> data; + ASSERT_OK(PutWithTTL("foo", "bar", 100, &data)); + auto blob_files = blob_db_impl()->TEST_GetBlobFiles(); + ASSERT_EQ(1, blob_files.size()); + auto blob_file = blob_files[0]; + ASSERT_FALSE(blob_file->Immutable()); + ASSERT_FALSE(blob_file->Obsolete()); + VerifyDB(data); + + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:0"); + mock_env_->set_current_time(250); + // The key should expired now. + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:1"); + + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:2"); + TEST_SYNC_POINT("BlobDBTest.ShutdownWait:3"); + Close(); + + SyncPoint::GetInstance()->DisableProcessing(); +} + +} // namespace blob_db +} // namespace rocksdb + +// A black-box test for the ttl wrapper around rocksdb +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, "SKIPPED as BlobDB is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.cc b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc new file mode 100644 index 00000000..37eee19d --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.cc @@ -0,0 +1,279 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include "utilities/blob_db/blob_dump_tool.h" +#include <inttypes.h> +#include <stdio.h> +#include <iostream> +#include <memory> +#include <string> +#include "port/port.h" +#include "rocksdb/convenience.h" +#include "rocksdb/env.h" +#include "table/format.h" +#include "util/coding.h" +#include "util/file_reader_writer.h" +#include "util/string_util.h" + +namespace rocksdb { +namespace blob_db { + +BlobDumpTool::BlobDumpTool() + : reader_(nullptr), buffer_(nullptr), buffer_size_(0) {} + +Status BlobDumpTool::Run(const std::string& filename, DisplayType show_key, + DisplayType show_blob, + DisplayType show_uncompressed_blob, + bool show_summary) { + constexpr size_t kReadaheadSize = 2 * 1024 * 1024; + Status s; + Env* env = Env::Default(); + s = env->FileExists(filename); + if (!s.ok()) { + return s; + } + uint64_t file_size = 0; + s = env->GetFileSize(filename, &file_size); + if (!s.ok()) { + return s; + } + std::unique_ptr<RandomAccessFile> file; + s = env->NewRandomAccessFile(filename, &file, EnvOptions()); + if (!s.ok()) { + return s; + } + file = NewReadaheadRandomAccessFile(std::move(file), kReadaheadSize); + if (file_size == 0) { + return Status::Corruption("File is empty."); + } + reader_.reset(new RandomAccessFileReader(std::move(file), filename)); + uint64_t offset = 0; + uint64_t footer_offset = 0; + CompressionType compression = kNoCompression; + s = DumpBlobLogHeader(&offset, &compression); + if (!s.ok()) { + return s; + } + s = DumpBlobLogFooter(file_size, &footer_offset); + if (!s.ok()) { + return s; + } + uint64_t total_records = 0; + uint64_t total_key_size = 0; + uint64_t total_blob_size = 0; + uint64_t total_uncompressed_blob_size = 0; + if (show_key != DisplayType::kNone || show_summary) { + while (offset < footer_offset) { + s = DumpRecord(show_key, show_blob, show_uncompressed_blob, show_summary, + compression, &offset, &total_records, &total_key_size, + &total_blob_size, &total_uncompressed_blob_size); + if (!s.ok()) { + break; + } + } + } + if (show_summary) { + fprintf(stdout, "Summary:\n"); + fprintf(stdout, " total records: %" PRIu64 "\n", total_records); + fprintf(stdout, " total key size: %" PRIu64 "\n", total_key_size); + fprintf(stdout, " total blob size: %" PRIu64 "\n", total_blob_size); + if (compression != kNoCompression) { + fprintf(stdout, " total raw blob size: %" PRIu64 "\n", + total_uncompressed_blob_size); + } + } + return s; +} + +Status BlobDumpTool::Read(uint64_t offset, size_t size, Slice* result) { + if (buffer_size_ < size) { + if (buffer_size_ == 0) { + buffer_size_ = 4096; + } + while (buffer_size_ < size) { + buffer_size_ *= 2; + } + buffer_.reset(new char[buffer_size_]); + } + Status s = reader_->Read(offset, size, result, buffer_.get()); + if (!s.ok()) { + return s; + } + if (result->size() != size) { + return Status::Corruption("Reach the end of the file unexpectedly."); + } + return s; +} + +Status BlobDumpTool::DumpBlobLogHeader(uint64_t* offset, + CompressionType* compression) { + Slice slice; + Status s = Read(0, BlobLogHeader::kSize, &slice); + if (!s.ok()) { + return s; + } + BlobLogHeader header; + s = header.DecodeFrom(slice); + if (!s.ok()) { + return s; + } + fprintf(stdout, "Blob log header:\n"); + fprintf(stdout, " Version : %" PRIu32 "\n", header.version); + fprintf(stdout, " Column Family ID : %" PRIu32 "\n", + header.column_family_id); + std::string compression_str; + if (!GetStringFromCompressionType(&compression_str, header.compression) + .ok()) { + compression_str = "Unrecongnized compression type (" + + ToString((int)header.compression) + ")"; + } + fprintf(stdout, " Compression : %s\n", compression_str.c_str()); + fprintf(stdout, " Expiration range : %s\n", + GetString(header.expiration_range).c_str()); + *offset = BlobLogHeader::kSize; + *compression = header.compression; + return s; +} + +Status BlobDumpTool::DumpBlobLogFooter(uint64_t file_size, + uint64_t* footer_offset) { + auto no_footer = [&]() { + *footer_offset = file_size; + fprintf(stdout, "No blob log footer.\n"); + return Status::OK(); + }; + if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) { + return no_footer(); + } + Slice slice; + *footer_offset = file_size - BlobLogFooter::kSize; + Status s = Read(*footer_offset, BlobLogFooter::kSize, &slice); + if (!s.ok()) { + return s; + } + BlobLogFooter footer; + s = footer.DecodeFrom(slice); + if (!s.ok()) { + return no_footer(); + } + fprintf(stdout, "Blob log footer:\n"); + fprintf(stdout, " Blob count : %" PRIu64 "\n", footer.blob_count); + fprintf(stdout, " Expiration Range : %s\n", + GetString(footer.expiration_range).c_str()); + return s; +} + +Status BlobDumpTool::DumpRecord(DisplayType show_key, DisplayType show_blob, + DisplayType show_uncompressed_blob, + bool show_summary, CompressionType compression, + uint64_t* offset, uint64_t* total_records, + uint64_t* total_key_size, + uint64_t* total_blob_size, + uint64_t* total_uncompressed_blob_size) { + if (show_key != DisplayType::kNone) { + fprintf(stdout, "Read record with offset 0x%" PRIx64 " (%" PRIu64 "):\n", + *offset, *offset); + } + Slice slice; + Status s = Read(*offset, BlobLogRecord::kHeaderSize, &slice); + if (!s.ok()) { + return s; + } + BlobLogRecord record; + s = record.DecodeHeaderFrom(slice); + if (!s.ok()) { + return s; + } + uint64_t key_size = record.key_size; + uint64_t value_size = record.value_size; + if (show_key != DisplayType::kNone) { + fprintf(stdout, " key size : %" PRIu64 "\n", key_size); + fprintf(stdout, " value size : %" PRIu64 "\n", value_size); + fprintf(stdout, " expiration : %" PRIu64 "\n", record.expiration); + } + *offset += BlobLogRecord::kHeaderSize; + s = Read(*offset, static_cast<size_t>(key_size + value_size), &slice); + if (!s.ok()) { + return s; + } + // Decompress value + std::string uncompressed_value; + if (compression != kNoCompression && + (show_uncompressed_blob != DisplayType::kNone || show_summary)) { + BlockContents contents; + UncompressionContext context(compression); + UncompressionInfo info(context, UncompressionDict::GetEmptyDict(), + compression); + s = UncompressBlockContentsForCompressionType( + info, slice.data() + key_size, static_cast<size_t>(value_size), + &contents, 2 /*compress_format_version*/, + ImmutableCFOptions(Options())); + if (!s.ok()) { + return s; + } + uncompressed_value = contents.data.ToString(); + } + if (show_key != DisplayType::kNone) { + fprintf(stdout, " key : "); + DumpSlice(Slice(slice.data(), static_cast<size_t>(key_size)), show_key); + if (show_blob != DisplayType::kNone) { + fprintf(stdout, " blob : "); + DumpSlice(Slice(slice.data() + static_cast<size_t>(key_size), static_cast<size_t>(value_size)), show_blob); + } + if (show_uncompressed_blob != DisplayType::kNone) { + fprintf(stdout, " raw blob : "); + DumpSlice(Slice(uncompressed_value), show_uncompressed_blob); + } + } + *offset += key_size + value_size; + *total_records += 1; + *total_key_size += key_size; + *total_blob_size += value_size; + *total_uncompressed_blob_size += uncompressed_value.size(); + return s; +} + +void BlobDumpTool::DumpSlice(const Slice s, DisplayType type) { + if (type == DisplayType::kRaw) { + fprintf(stdout, "%s\n", s.ToString().c_str()); + } else if (type == DisplayType::kHex) { + fprintf(stdout, "%s\n", s.ToString(true /*hex*/).c_str()); + } else if (type == DisplayType::kDetail) { + char buf[100]; + for (size_t i = 0; i < s.size(); i += 16) { + memset(buf, 0, sizeof(buf)); + for (size_t j = 0; j < 16 && i + j < s.size(); j++) { + unsigned char c = s[i + j]; + snprintf(buf + j * 3 + 15, 2, "%x", c >> 4); + snprintf(buf + j * 3 + 16, 2, "%x", c & 0xf); + snprintf(buf + j + 65, 2, "%c", (0x20 <= c && c <= 0x7e) ? c : '.'); + } + for (size_t p = 0; p < sizeof(buf) - 1; p++) { + if (buf[p] == 0) { + buf[p] = ' '; + } + } + fprintf(stdout, "%s\n", i == 0 ? buf + 15 : buf); + } + } +} + +template <class T> +std::string BlobDumpTool::GetString(std::pair<T, T> p) { + if (p.first == 0 && p.second == 0) { + return "nil"; + } + return "(" + ToString(p.first) + ", " + ToString(p.second) + ")"; +} + +} // namespace blob_db +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_dump_tool.h b/src/rocksdb/utilities/blob_db/blob_dump_tool.h new file mode 100644 index 00000000..ff4672fd --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_dump_tool.h @@ -0,0 +1,57 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include <memory> +#include <string> +#include <utility> +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "util/file_reader_writer.h" +#include "utilities/blob_db/blob_log_format.h" + +namespace rocksdb { +namespace blob_db { + +class BlobDumpTool { + public: + enum class DisplayType { + kNone, + kRaw, + kHex, + kDetail, + }; + + BlobDumpTool(); + + Status Run(const std::string& filename, DisplayType show_key, + DisplayType show_blob, DisplayType show_uncompressed_blob, + bool show_summary); + + private: + std::unique_ptr<RandomAccessFileReader> reader_; + std::unique_ptr<char[]> buffer_; + size_t buffer_size_; + + Status Read(uint64_t offset, size_t size, Slice* result); + Status DumpBlobLogHeader(uint64_t* offset, CompressionType* compression); + Status DumpBlobLogFooter(uint64_t file_size, uint64_t* footer_offset); + Status DumpRecord(DisplayType show_key, DisplayType show_blob, + DisplayType show_uncompressed_blob, bool show_summary, + CompressionType compression, uint64_t* offset, + uint64_t* total_records, uint64_t* total_key_size, + uint64_t* total_blob_size, + uint64_t* total_uncompressed_blob_size); + void DumpSlice(const Slice s, DisplayType type); + + template <class T> + std::string GetString(std::pair<T, T> p); +}; + +} // namespace blob_db +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_file.cc b/src/rocksdb/utilities/blob_db/blob_file.cc new file mode 100644 index 00000000..3bcbd048 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_file.cc @@ -0,0 +1,336 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE +#include "utilities/blob_db/blob_file.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include <inttypes.h> +#include <stdio.h> + +#include <algorithm> +#include <limits> +#include <memory> + +#include "db/column_family.h" +#include "db/db_impl.h" +#include "db/dbformat.h" +#include "util/filename.h" +#include "util/logging.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace rocksdb { + +namespace blob_db { + +BlobFile::BlobFile() + : parent_(nullptr), + file_number_(0), + info_log_(nullptr), + column_family_id_(std::numeric_limits<uint32_t>::max()), + compression_(kNoCompression), + has_ttl_(false), + blob_count_(0), + file_size_(0), + closed_(false), + obsolete_(false), + expiration_range_({0, 0}), + last_access_(-1), + last_fsync_(0), + header_valid_(false), + footer_valid_(false) {} + +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log) + : parent_(p), + path_to_dir_(bdir), + file_number_(fn), + info_log_(info_log), + column_family_id_(std::numeric_limits<uint32_t>::max()), + compression_(kNoCompression), + has_ttl_(false), + blob_count_(0), + file_size_(0), + closed_(false), + obsolete_(false), + expiration_range_({0, 0}), + last_access_(-1), + last_fsync_(0), + header_valid_(false), + footer_valid_(false) {} + +BlobFile::~BlobFile() { + if (obsolete_) { + std::string pn(PathName()); + Status s = Env::Default()->DeleteFile(PathName()); + if (!s.ok()) { + // ROCKS_LOG_INFO(db_options_.info_log, + // "File could not be deleted %s", pn.c_str()); + } + } +} + +uint32_t BlobFile::column_family_id() const { return column_family_id_; } + +std::string BlobFile::PathName() const { + return BlobFileName(path_to_dir_, file_number_); +} + +std::shared_ptr<Reader> BlobFile::OpenRandomAccessReader( + Env* env, const DBOptions& db_options, + const EnvOptions& env_options) const { + constexpr size_t kReadaheadSize = 2 * 1024 * 1024; + std::unique_ptr<RandomAccessFile> sfile; + std::string path_name(PathName()); + Status s = env->NewRandomAccessFile(path_name, &sfile, env_options); + if (!s.ok()) { + // report something here. + return nullptr; + } + sfile = NewReadaheadRandomAccessFile(std::move(sfile), kReadaheadSize); + + std::unique_ptr<RandomAccessFileReader> sfile_reader; + sfile_reader.reset(new RandomAccessFileReader(std::move(sfile), path_name)); + + std::shared_ptr<Reader> log_reader = std::make_shared<Reader>( + std::move(sfile_reader), db_options.env, db_options.statistics.get()); + + return log_reader; +} + +std::string BlobFile::DumpState() const { + char str[1000]; + snprintf( + str, sizeof(str), + "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 + " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 + "), writer: %d reader: %d", + path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), + closed_.load(), obsolete_.load(), expiration_range_.first, + expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); + return str; +} + +void BlobFile::MarkObsolete(SequenceNumber sequence) { + assert(Immutable()); + obsolete_sequence_ = sequence; + obsolete_.store(true); +} + +bool BlobFile::NeedsFsync(bool hard, uint64_t bytes_per_sync) const { + assert(last_fsync_ <= file_size_); + return (hard) ? file_size_ > last_fsync_ + : (file_size_ - last_fsync_) >= bytes_per_sync; +} + +Status BlobFile::WriteFooterAndCloseLocked() { + BlobLogFooter footer; + footer.blob_count = blob_count_; + if (HasTTL()) { + footer.expiration_range = expiration_range_; + } + + // this will close the file and reset the Writable File Pointer. + Status s = log_writer_->AppendFooter(footer); + if (s.ok()) { + closed_ = true; + file_size_ += BlobLogFooter::kSize; + } + // delete the sequential writer + log_writer_.reset(); + return s; +} + +Status BlobFile::ReadFooter(BlobLogFooter* bf) { + if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { + return Status::IOError("File does not have footer", PathName()); + } + + uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; + // assume that ra_file_reader_ is valid before we enter this + assert(ra_file_reader_); + + Slice result; + char scratch[BlobLogFooter::kSize + 10]; + Status s = ra_file_reader_->Read(footer_offset, BlobLogFooter::kSize, &result, + scratch); + if (!s.ok()) return s; + if (result.size() != BlobLogFooter::kSize) { + // should not happen + return Status::IOError("EOF reached before footer"); + } + + s = bf->DecodeFrom(result); + return s; +} + +Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { + // assume that file has been fully fsync'd + last_fsync_.store(file_size_); + blob_count_ = footer.blob_count; + expiration_range_ = footer.expiration_range; + closed_ = true; + return Status::OK(); +} + +Status BlobFile::Fsync() { + Status s; + if (log_writer_.get()) { + s = log_writer_->Sync(); + last_fsync_.store(file_size_.load()); + } + return s; +} + +void BlobFile::CloseRandomAccessLocked() { + ra_file_reader_.reset(); + last_access_ = -1; +} + +Status BlobFile::GetReader(Env* env, const EnvOptions& env_options, + std::shared_ptr<RandomAccessFileReader>* reader, + bool* fresh_open) { + assert(reader != nullptr); + assert(fresh_open != nullptr); + *fresh_open = false; + int64_t current_time = 0; + env->GetCurrentTime(¤t_time); + last_access_.store(current_time); + Status s; + + { + ReadLock lockbfile_r(&mutex_); + if (ra_file_reader_) { + *reader = ra_file_reader_; + return s; + } + } + + WriteLock lockbfile_w(&mutex_); + // Double check. + if (ra_file_reader_) { + *reader = ra_file_reader_; + return s; + } + + std::unique_ptr<RandomAccessFile> rfile; + s = env->NewRandomAccessFile(PathName(), &rfile, env_options); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file for random-read: %s status: '%s'" + " exists: '%s'", + PathName().c_str(), s.ToString().c_str(), + env->FileExists(PathName()).ToString().c_str()); + return s; + } + + ra_file_reader_ = std::make_shared<RandomAccessFileReader>(std::move(rfile), + PathName()); + *reader = ra_file_reader_; + *fresh_open = true; + return s; +} + +Status BlobFile::ReadMetadata(Env* env, const EnvOptions& env_options) { + assert(Immutable()); + // Get file size. + uint64_t file_size = 0; + Status s = env->GetFileSize(PathName(), &file_size); + if (s.ok()) { + file_size_ = file_size; + } else { + ROCKS_LOG_ERROR(info_log_, + "Failed to get size of blob file %" PRIu64 + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + if (file_size < BlobLogHeader::kSize) { + ROCKS_LOG_ERROR(info_log_, + "Incomplete blob file blob file %" PRIu64 + ", size: %" PRIu64, + file_number_, file_size); + return Status::Corruption("Incomplete blob file header."); + } + + // Create file reader. + std::unique_ptr<RandomAccessFile> file; + s = env->NewRandomAccessFile(PathName(), &file, env_options); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + std::unique_ptr<RandomAccessFileReader> file_reader( + new RandomAccessFileReader(std::move(file), PathName())); + + // Read file header. + char header_buf[BlobLogHeader::kSize]; + Slice header_slice; + s = file_reader->Read(0, BlobLogHeader::kSize, &header_slice, header_buf); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to read header of blob file %" PRIu64 + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogHeader header; + s = header.DecodeFrom(header_slice); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to decode header of blob file %" PRIu64 + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + column_family_id_ = header.column_family_id; + compression_ = header.compression; + has_ttl_ = header.has_ttl; + if (has_ttl_) { + expiration_range_ = header.expiration_range; + } + header_valid_ = true; + + // Read file footer. + if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + char footer_buf[BlobLogFooter::kSize]; + Slice footer_slice; + s = file_reader->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, + &footer_slice, footer_buf); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to read footer of blob file %" PRIu64 + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogFooter footer; + s = footer.DecodeFrom(footer_slice); + if (!s.ok()) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + blob_count_ = footer.blob_count; + if (has_ttl_) { + assert(header.expiration_range.first <= footer.expiration_range.first); + assert(header.expiration_range.second >= footer.expiration_range.second); + expiration_range_ = footer.expiration_range; + } + footer_valid_ = true; + return Status::OK(); +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_file.h b/src/rocksdb/utilities/blob_db/blob_file.h new file mode 100644 index 00000000..668a0372 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_file.h @@ -0,0 +1,214 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include <atomic> +#include <memory> + +#include "port/port.h" +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "util/file_reader_writer.h" +#include "utilities/blob_db/blob_log_format.h" +#include "utilities/blob_db/blob_log_reader.h" +#include "utilities/blob_db/blob_log_writer.h" + +namespace rocksdb { +namespace blob_db { + +class BlobDBImpl; + +class BlobFile { + friend class BlobDBImpl; + friend struct BlobFileComparator; + friend struct BlobFileComparatorTTL; + + private: + // access to parent + const BlobDBImpl* parent_; + + // path to blob directory + std::string path_to_dir_; + + // the id of the file. + // the above 2 are created during file creation and never changed + // after that + uint64_t file_number_; + + // Info log. + Logger* info_log_; + + // Column family id. + uint32_t column_family_id_; + + // Compression type of blobs in the file + CompressionType compression_; + + // If true, the keys in this file all has TTL. Otherwise all keys don't + // have TTL. + bool has_ttl_; + + // number of blobs in the file + std::atomic<uint64_t> blob_count_; + + // size of the file + std::atomic<uint64_t> file_size_; + + BlobLogHeader header_; + + // closed_ = true implies the file is no more mutable + // no more blobs will be appended and the footer has been written out + std::atomic<bool> closed_; + + // has a pass of garbage collection successfully finished on this file + // obsolete_ still needs to do iterator/snapshot checks + std::atomic<bool> obsolete_; + + // The last sequence number by the time the file marked as obsolete. + // Data in this file is visible to a snapshot taken before the sequence. + SequenceNumber obsolete_sequence_; + + ExpirationRange expiration_range_; + + // Sequential/Append writer for blobs + std::shared_ptr<Writer> log_writer_; + + // random access file reader for GET calls + std::shared_ptr<RandomAccessFileReader> ra_file_reader_; + + // This Read-Write mutex is per file specific and protects + // all the datastructures + mutable port::RWMutex mutex_; + + // time when the random access reader was last created. + std::atomic<std::int64_t> last_access_; + + // last time file was fsync'd/fdatasyncd + std::atomic<uint64_t> last_fsync_; + + bool header_valid_; + + bool footer_valid_; + + SequenceNumber garbage_collection_finish_sequence_; + + public: + BlobFile(); + + BlobFile(const BlobDBImpl* parent, const std::string& bdir, uint64_t fnum, + Logger* info_log); + + ~BlobFile(); + + uint32_t column_family_id() const; + + void SetColumnFamilyId(uint32_t cf_id) { + column_family_id_ = cf_id; + } + + // Returns log file's absolute pathname. + std::string PathName() const; + + // Primary identifier for blob file. + // once the file is created, this never changes + uint64_t BlobFileNumber() const { return file_number_; } + + // the following functions are atomic, and don't need + // read lock + uint64_t BlobCount() const { + return blob_count_.load(std::memory_order_acquire); + } + + std::string DumpState() const; + + // if the file is not taking any more appends. + bool Immutable() const { return closed_.load(); } + + // Mark the file as immutable. + // REQUIRES: write lock held, or access from single thread (on DB open). + void MarkImmutable() { closed_ = true; } + + // if the file has gone through GC and blobs have been relocated + bool Obsolete() const { + assert(Immutable() || !obsolete_.load()); + return obsolete_.load(); + } + + // Mark file as obsolete by garbage collection. The file is not visible to + // snapshots with sequence greater or equal to the given sequence. + void MarkObsolete(SequenceNumber sequence); + + SequenceNumber GetObsoleteSequence() const { + assert(Obsolete()); + return obsolete_sequence_; + } + + // we will assume this is atomic + bool NeedsFsync(bool hard, uint64_t bytes_per_sync) const; + + Status Fsync(); + + uint64_t GetFileSize() const { + return file_size_.load(std::memory_order_acquire); + } + + // All Get functions which are not atomic, will need ReadLock on the mutex + + ExpirationRange GetExpirationRange() const { return expiration_range_; } + + void ExtendExpirationRange(uint64_t expiration) { + expiration_range_.first = std::min(expiration_range_.first, expiration); + expiration_range_.second = std::max(expiration_range_.second, expiration); + } + + bool HasTTL() const { return has_ttl_; } + + void SetHasTTL(bool has_ttl) { has_ttl_ = has_ttl; } + + CompressionType compression() const { return compression_; } + + void SetCompression(CompressionType c) { + compression_ = c; + } + + std::shared_ptr<Writer> GetWriter() const { return log_writer_; } + + // Read blob file header and footer. Return corruption if file header is + // malform or incomplete. If footer is malform or incomplete, set + // footer_valid_ to false and return Status::OK. + Status ReadMetadata(Env* env, const EnvOptions& env_options); + + Status GetReader(Env* env, const EnvOptions& env_options, + std::shared_ptr<RandomAccessFileReader>* reader, + bool* fresh_open); + + private: + std::shared_ptr<Reader> OpenRandomAccessReader( + Env* env, const DBOptions& db_options, + const EnvOptions& env_options) const; + + Status ReadFooter(BlobLogFooter* footer); + + Status WriteFooterAndCloseLocked(); + + void CloseRandomAccessLocked(); + + // this is used, when you are reading only the footer of a + // previously closed file + Status SetFromFooterLocked(const BlobLogFooter& footer); + + void set_expiration_range(const ExpirationRange& expiration_range) { + expiration_range_ = expiration_range; + } + + // The following functions are atomic, and don't need locks + void SetFileSize(uint64_t fs) { file_size_ = fs; } + + void SetBlobCount(uint64_t bc) { blob_count_ = bc; } +}; +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_index.h b/src/rocksdb/utilities/blob_db/blob_index.h new file mode 100644 index 00000000..fd91b547 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_index.h @@ -0,0 +1,161 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once +#ifndef ROCKSDB_LITE + +#include "rocksdb/options.h" +#include "util/coding.h" +#include "util/string_util.h" + +namespace rocksdb { +namespace blob_db { + +// BlobIndex is a pointer to the blob and metadata of the blob. The index is +// stored in base DB as ValueType::kTypeBlobIndex. +// There are three types of blob index: +// +// kInlinedTTL: +// +------+------------+---------------+ +// | type | expiration | value | +// +------+------------+---------------+ +// | char | varint64 | variable size | +// +------+------------+---------------+ +// +// kBlob: +// +------+-------------+----------+----------+-------------+ +// | type | file number | offset | size | compression | +// +------+-------------+----------+----------+-------------+ +// | char | varint64 | varint64 | varint64 | char | +// +------+-------------+----------+----------+-------------+ +// +// kBlobTTL: +// +------+------------+-------------+----------+----------+-------------+ +// | type | expiration | file number | offset | size | compression | +// +------+------------+-------------+----------+----------+-------------+ +// | char | varint64 | varint64 | varint64 | varint64 | char | +// +------+------------+-------------+----------+----------+-------------+ +// +// There isn't a kInlined (without TTL) type since we can store it as a plain +// value (i.e. ValueType::kTypeValue). +class BlobIndex { + public: + enum class Type : unsigned char { + kInlinedTTL = 0, + kBlob = 1, + kBlobTTL = 2, + kUnknown = 3, + }; + + BlobIndex() : type_(Type::kUnknown) {} + + bool IsInlined() const { return type_ == Type::kInlinedTTL; } + + bool HasTTL() const { + return type_ == Type::kInlinedTTL || type_ == Type::kBlobTTL; + } + + uint64_t expiration() const { + assert(HasTTL()); + return expiration_; + } + + const Slice& value() const { + assert(IsInlined()); + return value_; + } + + uint64_t file_number() const { + assert(!IsInlined()); + return file_number_; + } + + uint64_t offset() const { + assert(!IsInlined()); + return offset_; + } + + uint64_t size() const { + assert(!IsInlined()); + return size_; + } + + Status DecodeFrom(Slice slice) { + static const std::string kErrorMessage = "Error while decoding blob index"; + assert(slice.size() > 0); + type_ = static_cast<Type>(*slice.data()); + if (type_ >= Type::kUnknown) { + return Status::Corruption( + kErrorMessage, + "Unknown blob index type: " + ToString(static_cast<char>(type_))); + } + slice = Slice(slice.data() + 1, slice.size() - 1); + if (HasTTL()) { + if (!GetVarint64(&slice, &expiration_)) { + return Status::Corruption(kErrorMessage, "Corrupted expiration"); + } + } + if (IsInlined()) { + value_ = slice; + } else { + if (GetVarint64(&slice, &file_number_) && GetVarint64(&slice, &offset_) && + GetVarint64(&slice, &size_) && slice.size() == 1) { + compression_ = static_cast<CompressionType>(*slice.data()); + } else { + return Status::Corruption(kErrorMessage, "Corrupted blob offset"); + } + } + return Status::OK(); + } + + static void EncodeInlinedTTL(std::string* dst, uint64_t expiration, + const Slice& value) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(1 + kMaxVarint64Length + value.size()); + dst->push_back(static_cast<char>(Type::kInlinedTTL)); + PutVarint64(dst, expiration); + dst->append(value.data(), value.size()); + } + + static void EncodeBlob(std::string* dst, uint64_t file_number, + uint64_t offset, uint64_t size, + CompressionType compression) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(kMaxVarint64Length * 3 + 2); + dst->push_back(static_cast<char>(Type::kBlob)); + PutVarint64(dst, file_number); + PutVarint64(dst, offset); + PutVarint64(dst, size); + dst->push_back(static_cast<char>(compression)); + } + + static void EncodeBlobTTL(std::string* dst, uint64_t expiration, + uint64_t file_number, uint64_t offset, + uint64_t size, CompressionType compression) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(kMaxVarint64Length * 4 + 2); + dst->push_back(static_cast<char>(Type::kBlobTTL)); + PutVarint64(dst, expiration); + PutVarint64(dst, file_number); + PutVarint64(dst, offset); + PutVarint64(dst, size); + dst->push_back(static_cast<char>(compression)); + } + + private: + Type type_ = Type::kUnknown; + uint64_t expiration_ = 0; + Slice value_; + uint64_t file_number_ = 0; + uint64_t offset_ = 0; + uint64_t size_ = 0; + CompressionType compression_ = kNoCompression; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_log_format.cc b/src/rocksdb/utilities/blob_db/blob_log_format.cc new file mode 100644 index 00000000..8726cb8f --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_log_format.cc @@ -0,0 +1,149 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_log_format.h" + +#include "util/coding.h" +#include "util/crc32c.h" + +namespace rocksdb { +namespace blob_db { + +void BlobLogHeader::EncodeTo(std::string* dst) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(BlobLogHeader::kSize); + PutFixed32(dst, kMagicNumber); + PutFixed32(dst, version); + PutFixed32(dst, column_family_id); + unsigned char flags = (has_ttl ? 1 : 0); + dst->push_back(flags); + dst->push_back(compression); + PutFixed64(dst, expiration_range.first); + PutFixed64(dst, expiration_range.second); +} + +Status BlobLogHeader::DecodeFrom(Slice src) { + static const std::string kErrorMessage = + "Error while decoding blob log header"; + if (src.size() != BlobLogHeader::kSize) { + return Status::Corruption(kErrorMessage, + "Unexpected blob file header size"); + } + uint32_t magic_number; + unsigned char flags; + if (!GetFixed32(&src, &magic_number) || !GetFixed32(&src, &version) || + !GetFixed32(&src, &column_family_id)) { + return Status::Corruption( + kErrorMessage, + "Error decoding magic number, version and column family id"); + } + if (magic_number != kMagicNumber) { + return Status::Corruption(kErrorMessage, "Magic number mismatch"); + } + if (version != kVersion1) { + return Status::Corruption(kErrorMessage, "Unknown header version"); + } + flags = src.data()[0]; + compression = static_cast<CompressionType>(src.data()[1]); + has_ttl = (flags & 1) == 1; + src.remove_prefix(2); + if (!GetFixed64(&src, &expiration_range.first) || + !GetFixed64(&src, &expiration_range.second)) { + return Status::Corruption(kErrorMessage, "Error decoding expiration range"); + } + return Status::OK(); +} + +void BlobLogFooter::EncodeTo(std::string* dst) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(BlobLogFooter::kSize); + PutFixed32(dst, kMagicNumber); + PutFixed64(dst, blob_count); + PutFixed64(dst, expiration_range.first); + PutFixed64(dst, expiration_range.second); + crc = crc32c::Value(dst->c_str(), dst->size()); + crc = crc32c::Mask(crc); + PutFixed32(dst, crc); +} + +Status BlobLogFooter::DecodeFrom(Slice src) { + static const std::string kErrorMessage = + "Error while decoding blob log footer"; + if (src.size() != BlobLogFooter::kSize) { + return Status::Corruption(kErrorMessage, + "Unexpected blob file footer size"); + } + uint32_t src_crc = 0; + src_crc = crc32c::Value(src.data(), BlobLogFooter::kSize - sizeof(uint32_t)); + src_crc = crc32c::Mask(src_crc); + uint32_t magic_number = 0; + if (!GetFixed32(&src, &magic_number) || !GetFixed64(&src, &blob_count) || + !GetFixed64(&src, &expiration_range.first) || + !GetFixed64(&src, &expiration_range.second) || !GetFixed32(&src, &crc)) { + return Status::Corruption(kErrorMessage, "Error decoding content"); + } + if (magic_number != kMagicNumber) { + return Status::Corruption(kErrorMessage, "Magic number mismatch"); + } + if (src_crc != crc) { + return Status::Corruption(kErrorMessage, "CRC mismatch"); + } + return Status::OK(); +} + +void BlobLogRecord::EncodeHeaderTo(std::string* dst) { + assert(dst != nullptr); + dst->clear(); + dst->reserve(BlobLogRecord::kHeaderSize + key.size() + value.size()); + PutFixed64(dst, key.size()); + PutFixed64(dst, value.size()); + PutFixed64(dst, expiration); + header_crc = crc32c::Value(dst->c_str(), dst->size()); + header_crc = crc32c::Mask(header_crc); + PutFixed32(dst, header_crc); + blob_crc = crc32c::Value(key.data(), key.size()); + blob_crc = crc32c::Extend(blob_crc, value.data(), value.size()); + blob_crc = crc32c::Mask(blob_crc); + PutFixed32(dst, blob_crc); +} + +Status BlobLogRecord::DecodeHeaderFrom(Slice src) { + static const std::string kErrorMessage = "Error while decoding blob record"; + if (src.size() != BlobLogRecord::kHeaderSize) { + return Status::Corruption(kErrorMessage, + "Unexpected blob record header size"); + } + uint32_t src_crc = 0; + src_crc = crc32c::Value(src.data(), BlobLogRecord::kHeaderSize - 8); + src_crc = crc32c::Mask(src_crc); + if (!GetFixed64(&src, &key_size) || !GetFixed64(&src, &value_size) || + !GetFixed64(&src, &expiration) || !GetFixed32(&src, &header_crc) || + !GetFixed32(&src, &blob_crc)) { + return Status::Corruption(kErrorMessage, "Error decoding content"); + } + if (src_crc != header_crc) { + return Status::Corruption(kErrorMessage, "Header CRC mismatch"); + } + return Status::OK(); +} + +Status BlobLogRecord::CheckBlobCRC() const { + uint32_t expected_crc = 0; + expected_crc = crc32c::Value(key.data(), key.size()); + expected_crc = crc32c::Extend(expected_crc, value.data(), value.size()); + expected_crc = crc32c::Mask(expected_crc); + if (expected_crc != blob_crc) { + return Status::Corruption("Blob CRC mismatch"); + } + return Status::OK(); +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_log_format.h b/src/rocksdb/utilities/blob_db/blob_log_format.h new file mode 100644 index 00000000..fcc042f0 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_log_format.h @@ -0,0 +1,125 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Log format information shared by reader and writer. + +#pragma once + +#ifndef ROCKSDB_LITE + +#include <limits> +#include <memory> +#include <utility> + +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" + +namespace rocksdb { +namespace blob_db { + +constexpr uint32_t kMagicNumber = 2395959; // 0x00248f37 +constexpr uint32_t kVersion1 = 1; +constexpr uint64_t kNoExpiration = std::numeric_limits<uint64_t>::max(); + +using ExpirationRange = std::pair<uint64_t, uint64_t>; + +// Format of blob log file header (30 bytes): +// +// +--------------+---------+---------+-------+-------------+-------------------+ +// | magic number | version | cf id | flags | compression | expiration range | +// +--------------+---------+---------+-------+-------------+-------------------+ +// | Fixed32 | Fixed32 | Fixed32 | char | char | Fixed64 Fixed64 | +// +--------------+---------+---------+-------+-------------+-------------------+ +// +// List of flags: +// has_ttl: Whether the file contain TTL data. +// +// Expiration range in the header is a rough range based on +// blob_db_options.ttl_range_secs. +struct BlobLogHeader { + static constexpr size_t kSize = 30; + + uint32_t version = kVersion1; + uint32_t column_family_id = 0; + CompressionType compression = kNoCompression; + bool has_ttl = false; + ExpirationRange expiration_range = std::make_pair(0, 0); + + void EncodeTo(std::string* dst); + + Status DecodeFrom(Slice slice); +}; + +// Format of blob log file footer (32 bytes): +// +// +--------------+------------+-------------------+------------+ +// | magic number | blob count | expiration range | footer CRC | +// +--------------+------------+-------------------+------------+ +// | Fixed32 | Fixed64 | Fixed64 + Fixed64 | Fixed32 | +// +--------------+------------+-------------------+------------+ +// +// The footer will be presented only when the blob file is properly closed. +// +// Unlike the same field in file header, expiration range in the footer is the +// range of smallest and largest expiration of the data in this file. +struct BlobLogFooter { + static constexpr size_t kSize = 32; + + uint64_t blob_count = 0; + ExpirationRange expiration_range = std::make_pair(0, 0); + uint32_t crc = 0; + + void EncodeTo(std::string* dst); + + Status DecodeFrom(Slice slice); +}; + +// Blob record format (32 bytes header + key + value): +// +// +------------+--------------+------------+------------+----------+---------+-----------+ +// | key length | value length | expiration | header CRC | blob CRC | key | value | +// +------------+--------------+------------+------------+----------+---------+-----------+ +// | Fixed64 | Fixed64 | Fixed64 | Fixed32 | Fixed32 | key len | value len | +// +------------+--------------+------------+------------+----------+---------+-----------+ +// +// If file has has_ttl = false, expiration field is always 0, and the blob +// doesn't has expiration. +// +// Also note that if compression is used, value is compressed value and value +// length is compressed value length. +// +// Header CRC is the checksum of (key_len + val_len + expiration), while +// blob CRC is the checksum of (key + value). +// +// We could use variable length encoding (Varint64) to save more space, but it +// make reader more complicated. +struct BlobLogRecord { + // header include fields up to blob CRC + static constexpr size_t kHeaderSize = 32; + + uint64_t key_size = 0; + uint64_t value_size = 0; + uint64_t expiration = 0; + uint32_t header_crc = 0; + uint32_t blob_crc = 0; + Slice key; + Slice value; + std::unique_ptr<char[]> key_buf; + std::unique_ptr<char[]> value_buf; + + uint64_t record_size() const { return kHeaderSize + key_size + value_size; } + + void EncodeHeaderTo(std::string* dst); + + Status DecodeHeaderFrom(Slice src); + + Status CheckBlobCRC() const; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_log_reader.cc b/src/rocksdb/utilities/blob_db/blob_log_reader.cc new file mode 100644 index 00000000..8ffcc2fa --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_log_reader.cc @@ -0,0 +1,105 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_log_reader.h" + +#include <algorithm> + +#include "monitoring/statistics.h" +#include "util/file_reader_writer.h" +#include "util/stop_watch.h" + +namespace rocksdb { +namespace blob_db { + +Reader::Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env, + Statistics* statistics) + : file_(std::move(file_reader)), + env_(env), + statistics_(statistics), + buffer_(), + next_byte_(0) {} + +Status Reader::ReadSlice(uint64_t size, Slice* slice, char* buf) { + StopWatch read_sw(env_, statistics_, BLOB_DB_BLOB_FILE_READ_MICROS); + Status s = file_->Read(next_byte_, static_cast<size_t>(size), slice, buf); + next_byte_ += size; + if (!s.ok()) { + return s; + } + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_READ, slice->size()); + if (slice->size() != size) { + return Status::Corruption("EOF reached while reading record"); + } + return s; +} + +Status Reader::ReadHeader(BlobLogHeader* header) { + assert(file_.get() != nullptr); + assert(next_byte_ == 0); + Status s = ReadSlice(BlobLogHeader::kSize, &buffer_, header_buf_); + if (!s.ok()) { + return s; + } + + if (buffer_.size() != BlobLogHeader::kSize) { + return Status::Corruption("EOF reached before file header"); + } + + return header->DecodeFrom(buffer_); +} + +Status Reader::ReadRecord(BlobLogRecord* record, ReadLevel level, + uint64_t* blob_offset) { + Status s = ReadSlice(BlobLogRecord::kHeaderSize, &buffer_, header_buf_); + if (!s.ok()) { + return s; + } + if (buffer_.size() != BlobLogRecord::kHeaderSize) { + return Status::Corruption("EOF reached before record header"); + } + + s = record->DecodeHeaderFrom(buffer_); + if (!s.ok()) { + return s; + } + + uint64_t kb_size = record->key_size + record->value_size; + if (blob_offset != nullptr) { + *blob_offset = next_byte_ + record->key_size; + } + + switch (level) { + case kReadHeader: + next_byte_ += kb_size; + break; + + case kReadHeaderKey: + record->key_buf.reset(new char[record->key_size]); + s = ReadSlice(record->key_size, &record->key, record->key_buf.get()); + next_byte_ += record->value_size; + break; + + case kReadHeaderKeyBlob: + record->key_buf.reset(new char[record->key_size]); + s = ReadSlice(record->key_size, &record->key, record->key_buf.get()); + if (s.ok()) { + record->value_buf.reset(new char[record->value_size]); + s = ReadSlice(record->value_size, &record->value, + record->value_buf.get()); + } + if (s.ok()) { + s = record->CheckBlobCRC(); + } + break; + } + return s; +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_log_reader.h b/src/rocksdb/utilities/blob_db/blob_log_reader.h new file mode 100644 index 00000000..45e2e955 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_log_reader.h @@ -0,0 +1,83 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#pragma once + +#ifndef ROCKSDB_LITE + +#include <memory> +#include <string> + +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "util/file_reader_writer.h" +#include "utilities/blob_db/blob_log_format.h" + +namespace rocksdb { + +class SequentialFileReader; +class Logger; + +namespace blob_db { + +/** + * Reader is a general purpose log stream reader implementation. The actual job + * of reading from the device is implemented by the SequentialFile interface. + * + * Please see Writer for details on the file and record layout. + */ +class Reader { + public: + enum ReadLevel { + kReadHeader, + kReadHeaderKey, + kReadHeaderKeyBlob, + }; + + // Create a reader that will return log records from "*file". + // "*file" must remain live while this Reader is in use. + Reader(std::unique_ptr<RandomAccessFileReader>&& file_reader, Env* env, + Statistics* statistics); + + ~Reader() = default; + + // No copying allowed + Reader(const Reader&) = delete; + Reader& operator=(const Reader&) = delete; + + Status ReadHeader(BlobLogHeader* header); + + // Read the next record into *record. Returns true if read + // successfully, false if we hit end of the input. May use + // "*scratch" as temporary storage. The contents filled in *record + // will only be valid until the next mutating operation on this + // reader or the next mutation to *scratch. + // If blob_offset is non-null, return offset of the blob through it. + Status ReadRecord(BlobLogRecord* record, ReadLevel level = kReadHeader, + uint64_t* blob_offset = nullptr); + + void ResetNextByte() { next_byte_ = 0; } + + uint64_t GetNextByte() const { return next_byte_; } + + private: + Status ReadSlice(uint64_t size, Slice* slice, char* buf); + + const std::unique_ptr<RandomAccessFileReader> file_; + Env* env_; + Statistics* statistics_; + + Slice buffer_; + char header_buf_[BlobLogRecord::kHeaderSize]; + + // which byte to read next. For asserting proper usage + uint64_t next_byte_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_log_writer.cc b/src/rocksdb/utilities/blob_db/blob_log_writer.cc new file mode 100644 index 00000000..51578c5c --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_log_writer.cc @@ -0,0 +1,139 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_log_writer.h" + +#include <cstdint> +#include <string> + +#include "monitoring/statistics.h" +#include "rocksdb/env.h" +#include "util/coding.h" +#include "util/file_reader_writer.h" +#include "util/stop_watch.h" +#include "utilities/blob_db/blob_log_format.h" + +namespace rocksdb { +namespace blob_db { + +Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env, + Statistics* statistics, uint64_t log_number, uint64_t bpsync, + bool use_fs, uint64_t boffset) + : dest_(std::move(dest)), + env_(env), + statistics_(statistics), + log_number_(log_number), + block_offset_(boffset), + bytes_per_sync_(bpsync), + next_sync_offset_(0), + use_fsync_(use_fs), + last_elem_type_(kEtNone) {} + +Status Writer::Sync() { + StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS); + Status s = dest_->Sync(use_fsync_); + RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED); + return s; +} + +Status Writer::WriteHeader(BlobLogHeader& header) { + assert(block_offset_ == 0); + assert(last_elem_type_ == kEtNone); + std::string str; + header.EncodeTo(&str); + + Status s = dest_->Append(Slice(str)); + if (s.ok()) { + block_offset_ += str.size(); + s = dest_->Flush(); + } + last_elem_type_ = kEtFileHdr; + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + BlobLogHeader::kSize); + return s; +} + +Status Writer::AppendFooter(BlobLogFooter& footer) { + assert(block_offset_ != 0); + assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); + + std::string str; + footer.EncodeTo(&str); + + Status s = dest_->Append(Slice(str)); + if (s.ok()) { + block_offset_ += str.size(); + s = dest_->Close(); + dest_.reset(); + } + + last_elem_type_ = kEtFileFooter; + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + BlobLogFooter::kSize); + return s; +} + +Status Writer::AddRecord(const Slice& key, const Slice& val, + uint64_t expiration, uint64_t* key_offset, + uint64_t* blob_offset) { + assert(block_offset_ != 0); + assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); + + std::string buf; + ConstructBlobHeader(&buf, key, val, expiration); + + Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); + return s; +} + +Status Writer::AddRecord(const Slice& key, const Slice& val, + uint64_t* key_offset, uint64_t* blob_offset) { + assert(block_offset_ != 0); + assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); + + std::string buf; + ConstructBlobHeader(&buf, key, val, 0); + + Status s = EmitPhysicalRecord(buf, key, val, key_offset, blob_offset); + return s; +} + +void Writer::ConstructBlobHeader(std::string* buf, const Slice& key, + const Slice& val, uint64_t expiration) { + BlobLogRecord record; + record.key = key; + record.value = val; + record.expiration = expiration; + record.EncodeHeaderTo(buf); +} + +Status Writer::EmitPhysicalRecord(const std::string& headerbuf, + const Slice& key, const Slice& val, + uint64_t* key_offset, uint64_t* blob_offset) { + StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS); + Status s = dest_->Append(Slice(headerbuf)); + if (s.ok()) { + s = dest_->Append(key); + } + if (s.ok()) { + s = dest_->Append(val); + } + if (s.ok()) { + s = dest_->Flush(); + } + + *key_offset = block_offset_ + BlobLogRecord::kHeaderSize; + *blob_offset = *key_offset + key.size(); + block_offset_ = *blob_offset + val.size(); + last_elem_type_ = kEtRecord; + RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, + BlobLogRecord::kHeaderSize + key.size() + val.size()); + return s; +} + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src/rocksdb/utilities/blob_db/blob_log_writer.h b/src/rocksdb/utilities/blob_db/blob_log_writer.h new file mode 100644 index 00000000..dccac355 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_log_writer.h @@ -0,0 +1,95 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#pragma once + +#ifndef ROCKSDB_LITE + +#include <cstdint> +#include <memory> +#include <string> + +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "rocksdb/statistics.h" +#include "rocksdb/status.h" +#include "rocksdb/types.h" +#include "utilities/blob_db/blob_log_format.h" + +namespace rocksdb { + +class WritableFileWriter; + +namespace blob_db { + +/** + * Writer is the blob log stream writer. It provides an append-only + * abstraction for writing blob data. + * + * + * Look at blob_db_format.h to see the details of the record formats. + */ + +class Writer { + public: + // Create a writer that will append data to "*dest". + // "*dest" must be initially empty. + // "*dest" must remain live while this Writer is in use. + Writer(std::unique_ptr<WritableFileWriter>&& dest, Env* env, + Statistics* statistics, uint64_t log_number, uint64_t bpsync, + bool use_fsync, uint64_t boffset = 0); + + ~Writer() = default; + + // No copying allowed + Writer(const Writer&) = delete; + Writer& operator=(const Writer&) = delete; + + static void ConstructBlobHeader(std::string* buf, const Slice& key, + const Slice& val, uint64_t expiration); + + Status AddRecord(const Slice& key, const Slice& val, uint64_t* key_offset, + uint64_t* blob_offset); + + Status AddRecord(const Slice& key, const Slice& val, uint64_t expiration, + uint64_t* key_offset, uint64_t* blob_offset); + + Status EmitPhysicalRecord(const std::string& headerbuf, const Slice& key, + const Slice& val, uint64_t* key_offset, + uint64_t* blob_offset); + + Status AppendFooter(BlobLogFooter& footer); + + Status WriteHeader(BlobLogHeader& header); + + WritableFileWriter* file() { return dest_.get(); } + + const WritableFileWriter* file() const { return dest_.get(); } + + uint64_t get_log_number() const { return log_number_; } + + bool ShouldSync() const { return block_offset_ > next_sync_offset_; } + + Status Sync(); + + void ResetSyncPointer() { next_sync_offset_ += bytes_per_sync_; } + + private: + std::unique_ptr<WritableFileWriter> dest_; + Env* env_; + Statistics* statistics_; + uint64_t log_number_; + uint64_t block_offset_; // Current offset in block + uint64_t bytes_per_sync_; + uint64_t next_sync_offset_; + bool use_fsync_; + + public: + enum ElemType { kEtNone, kEtFileHdr, kEtRecord, kEtFileFooter }; + ElemType last_elem_type_; +}; + +} // namespace blob_db +} // namespace rocksdb +#endif // ROCKSDB_LITE |