diff options
Diffstat (limited to 'src/rocksdb/db/column_family.cc')
-rw-r--r-- | src/rocksdb/db/column_family.cc | 1683 |
1 files changed, 1683 insertions, 0 deletions
diff --git a/src/rocksdb/db/column_family.cc b/src/rocksdb/db/column_family.cc new file mode 100644 index 000000000..268060ddf --- /dev/null +++ b/src/rocksdb/db/column_family.cc @@ -0,0 +1,1683 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "db/column_family.h" + +#include <algorithm> +#include <cinttypes> +#include <limits> +#include <sstream> +#include <string> +#include <vector> + +#include "db/blob/blob_file_cache.h" +#include "db/blob/blob_source.h" +#include "db/compaction/compaction_picker.h" +#include "db/compaction/compaction_picker_fifo.h" +#include "db/compaction/compaction_picker_level.h" +#include "db/compaction/compaction_picker_universal.h" +#include "db/db_impl/db_impl.h" +#include "db/internal_stats.h" +#include "db/job_context.h" +#include "db/range_del_aggregator.h" +#include "db/table_properties_collector.h" +#include "db/version_set.h" +#include "db/write_controller.h" +#include "file/sst_file_manager_impl.h" +#include "logging/logging.h" +#include "monitoring/thread_status_util.h" +#include "options/options_helper.h" +#include "port/port.h" +#include "rocksdb/convenience.h" +#include "rocksdb/table.h" +#include "table/merging_iterator.h" +#include "util/autovector.h" +#include "util/cast_util.h" +#include "util/compression.h" + +namespace ROCKSDB_NAMESPACE { + +ColumnFamilyHandleImpl::ColumnFamilyHandleImpl( + ColumnFamilyData* column_family_data, DBImpl* db, InstrumentedMutex* mutex) + : cfd_(column_family_data), db_(db), mutex_(mutex) { + if (cfd_ != nullptr) { + cfd_->Ref(); + } +} + +ColumnFamilyHandleImpl::~ColumnFamilyHandleImpl() { + if (cfd_ != nullptr) { +#ifndef ROCKSDB_LITE + for (auto& listener : cfd_->ioptions()->listeners) { + listener->OnColumnFamilyHandleDeletionStarted(this); + } +#endif // ROCKSDB_LITE + // Job id == 0 means that this is not our background process, but rather + // user thread + // Need to hold some shared pointers owned by the initial_cf_options + // before final cleaning up finishes. + ColumnFamilyOptions initial_cf_options_copy = cfd_->initial_cf_options(); + JobContext job_context(0); + mutex_->Lock(); + bool dropped = cfd_->IsDropped(); + if (cfd_->UnrefAndTryDelete()) { + if (dropped) { + db_->FindObsoleteFiles(&job_context, false, true); + } + } + mutex_->Unlock(); + if (job_context.HaveSomethingToDelete()) { + bool defer_purge = + db_->immutable_db_options().avoid_unnecessary_blocking_io; + db_->PurgeObsoleteFiles(job_context, defer_purge); + } + job_context.Clean(); + } +} + +uint32_t ColumnFamilyHandleImpl::GetID() const { return cfd()->GetID(); } + +const std::string& ColumnFamilyHandleImpl::GetName() const { + return cfd()->GetName(); +} + +Status ColumnFamilyHandleImpl::GetDescriptor(ColumnFamilyDescriptor* desc) { +#ifndef ROCKSDB_LITE + // accessing mutable cf-options requires db mutex. + InstrumentedMutexLock l(mutex_); + *desc = ColumnFamilyDescriptor(cfd()->GetName(), cfd()->GetLatestCFOptions()); + return Status::OK(); +#else + (void)desc; + return Status::NotSupported(); +#endif // !ROCKSDB_LITE +} + +const Comparator* ColumnFamilyHandleImpl::GetComparator() const { + return cfd()->user_comparator(); +} + +void GetIntTblPropCollectorFactory( + const ImmutableCFOptions& ioptions, + IntTblPropCollectorFactories* int_tbl_prop_collector_factories) { + assert(int_tbl_prop_collector_factories); + + auto& collector_factories = ioptions.table_properties_collector_factories; + for (size_t i = 0; i < ioptions.table_properties_collector_factories.size(); + ++i) { + assert(collector_factories[i]); + int_tbl_prop_collector_factories->emplace_back( + new UserKeyTablePropertiesCollectorFactory(collector_factories[i])); + } +} + +Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options) { + if (!cf_options.compression_per_level.empty()) { + for (size_t level = 0; level < cf_options.compression_per_level.size(); + ++level) { + if (!CompressionTypeSupported(cf_options.compression_per_level[level])) { + return Status::InvalidArgument( + "Compression type " + + CompressionTypeToString(cf_options.compression_per_level[level]) + + " is not linked with the binary."); + } + } + } else { + if (!CompressionTypeSupported(cf_options.compression)) { + return Status::InvalidArgument( + "Compression type " + + CompressionTypeToString(cf_options.compression) + + " is not linked with the binary."); + } + } + if (cf_options.compression_opts.zstd_max_train_bytes > 0) { + if (cf_options.compression_opts.use_zstd_dict_trainer) { + if (!ZSTD_TrainDictionarySupported()) { + return Status::InvalidArgument( + "zstd dictionary trainer cannot be used because ZSTD 1.1.3+ " + "is not linked with the binary."); + } + } else if (!ZSTD_FinalizeDictionarySupported()) { + return Status::InvalidArgument( + "zstd finalizeDictionary cannot be used because ZSTD 1.4.5+ " + "is not linked with the binary."); + } + if (cf_options.compression_opts.max_dict_bytes == 0) { + return Status::InvalidArgument( + "The dictionary size limit (`CompressionOptions::max_dict_bytes`) " + "should be nonzero if we're using zstd's dictionary generator."); + } + } + + if (!CompressionTypeSupported(cf_options.blob_compression_type)) { + std::ostringstream oss; + oss << "The specified blob compression type " + << CompressionTypeToString(cf_options.blob_compression_type) + << " is not available."; + + return Status::InvalidArgument(oss.str()); + } + + return Status::OK(); +} + +Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) { + if (cf_options.inplace_update_support) { + return Status::InvalidArgument( + "In-place memtable updates (inplace_update_support) is not compatible " + "with concurrent writes (allow_concurrent_memtable_write)"); + } + if (!cf_options.memtable_factory->IsInsertConcurrentlySupported()) { + return Status::InvalidArgument( + "Memtable doesn't concurrent writes (allow_concurrent_memtable_write)"); + } + return Status::OK(); +} + +Status CheckCFPathsSupported(const DBOptions& db_options, + const ColumnFamilyOptions& cf_options) { + // More than one cf_paths are supported only in universal + // and level compaction styles. This function also checks the case + // in which cf_paths is not specified, which results in db_paths + // being used. + if ((cf_options.compaction_style != kCompactionStyleUniversal) && + (cf_options.compaction_style != kCompactionStyleLevel)) { + if (cf_options.cf_paths.size() > 1) { + return Status::NotSupported( + "More than one CF paths are only supported in " + "universal and level compaction styles. "); + } else if (cf_options.cf_paths.empty() && db_options.db_paths.size() > 1) { + return Status::NotSupported( + "More than one DB paths are only supported in " + "universal and level compaction styles. "); + } + } + return Status::OK(); +} + +namespace { +const uint64_t kDefaultTtl = 0xfffffffffffffffe; +const uint64_t kDefaultPeriodicCompSecs = 0xfffffffffffffffe; +} // anonymous namespace + +ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, + const ColumnFamilyOptions& src) { + ColumnFamilyOptions result = src; + size_t clamp_max = std::conditional< + sizeof(size_t) == 4, std::integral_constant<size_t, 0xffffffff>, + std::integral_constant<uint64_t, 64ull << 30>>::type::value; + ClipToRange(&result.write_buffer_size, (static_cast<size_t>(64)) << 10, + clamp_max); + // if user sets arena_block_size, we trust user to use this value. Otherwise, + // calculate a proper value from writer_buffer_size; + if (result.arena_block_size <= 0) { + result.arena_block_size = + std::min(size_t{1024 * 1024}, result.write_buffer_size / 8); + + // Align up to 4k + const size_t align = 4 * 1024; + result.arena_block_size = + ((result.arena_block_size + align - 1) / align) * align; + } + result.min_write_buffer_number_to_merge = + std::min(result.min_write_buffer_number_to_merge, + result.max_write_buffer_number - 1); + if (result.min_write_buffer_number_to_merge < 1) { + result.min_write_buffer_number_to_merge = 1; + } + + if (db_options.atomic_flush && result.min_write_buffer_number_to_merge > 1) { + ROCKS_LOG_WARN( + db_options.logger, + "Currently, if atomic_flush is true, then triggering flush for any " + "column family internally (non-manual flush) will trigger flushing " + "all column families even if the number of memtables is smaller " + "min_write_buffer_number_to_merge. Therefore, configuring " + "min_write_buffer_number_to_merge > 1 is not compatible and should " + "be satinized to 1. Not doing so will lead to data loss and " + "inconsistent state across multiple column families when WAL is " + "disabled, which is a common setting for atomic flush"); + + result.min_write_buffer_number_to_merge = 1; + } + + if (result.num_levels < 1) { + result.num_levels = 1; + } + if (result.compaction_style == kCompactionStyleLevel && + result.num_levels < 2) { + result.num_levels = 2; + } + + if (result.compaction_style == kCompactionStyleUniversal && + db_options.allow_ingest_behind && result.num_levels < 3) { + result.num_levels = 3; + } + + if (result.max_write_buffer_number < 2) { + result.max_write_buffer_number = 2; + } + // fall back max_write_buffer_number_to_maintain if + // max_write_buffer_size_to_maintain is not set + if (result.max_write_buffer_size_to_maintain < 0) { + result.max_write_buffer_size_to_maintain = + result.max_write_buffer_number * + static_cast<int64_t>(result.write_buffer_size); + } else if (result.max_write_buffer_size_to_maintain == 0 && + result.max_write_buffer_number_to_maintain < 0) { + result.max_write_buffer_number_to_maintain = result.max_write_buffer_number; + } + // bloom filter size shouldn't exceed 1/4 of memtable size. + if (result.memtable_prefix_bloom_size_ratio > 0.25) { + result.memtable_prefix_bloom_size_ratio = 0.25; + } else if (result.memtable_prefix_bloom_size_ratio < 0) { + result.memtable_prefix_bloom_size_ratio = 0; + } + + if (!result.prefix_extractor) { + assert(result.memtable_factory); + Slice name = result.memtable_factory->Name(); + if (name.compare("HashSkipListRepFactory") == 0 || + name.compare("HashLinkListRepFactory") == 0) { + result.memtable_factory = std::make_shared<SkipListFactory>(); + } + } + + if (result.compaction_style == kCompactionStyleFIFO) { + // since we delete level0 files in FIFO compaction when there are too many + // of them, these options don't really mean anything + result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max(); + result.level0_stop_writes_trigger = std::numeric_limits<int>::max(); + } + + if (result.max_bytes_for_level_multiplier <= 0) { + result.max_bytes_for_level_multiplier = 1; + } + + if (result.level0_file_num_compaction_trigger == 0) { + ROCKS_LOG_WARN(db_options.logger, + "level0_file_num_compaction_trigger cannot be 0"); + result.level0_file_num_compaction_trigger = 1; + } + + if (result.level0_stop_writes_trigger < + result.level0_slowdown_writes_trigger || + result.level0_slowdown_writes_trigger < + result.level0_file_num_compaction_trigger) { + ROCKS_LOG_WARN(db_options.logger, + "This condition must be satisfied: " + "level0_stop_writes_trigger(%d) >= " + "level0_slowdown_writes_trigger(%d) >= " + "level0_file_num_compaction_trigger(%d)", + result.level0_stop_writes_trigger, + result.level0_slowdown_writes_trigger, + result.level0_file_num_compaction_trigger); + if (result.level0_slowdown_writes_trigger < + result.level0_file_num_compaction_trigger) { + result.level0_slowdown_writes_trigger = + result.level0_file_num_compaction_trigger; + } + if (result.level0_stop_writes_trigger < + result.level0_slowdown_writes_trigger) { + result.level0_stop_writes_trigger = result.level0_slowdown_writes_trigger; + } + ROCKS_LOG_WARN(db_options.logger, + "Adjust the value to " + "level0_stop_writes_trigger(%d)" + "level0_slowdown_writes_trigger(%d)" + "level0_file_num_compaction_trigger(%d)", + result.level0_stop_writes_trigger, + result.level0_slowdown_writes_trigger, + result.level0_file_num_compaction_trigger); + } + + if (result.soft_pending_compaction_bytes_limit == 0) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } else if (result.hard_pending_compaction_bytes_limit > 0 && + result.soft_pending_compaction_bytes_limit > + result.hard_pending_compaction_bytes_limit) { + result.soft_pending_compaction_bytes_limit = + result.hard_pending_compaction_bytes_limit; + } + +#ifndef ROCKSDB_LITE + // When the DB is stopped, it's possible that there are some .trash files that + // were not deleted yet, when we open the DB we will find these .trash files + // and schedule them to be deleted (or delete immediately if SstFileManager + // was not used) + auto sfm = + static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get()); + for (size_t i = 0; i < result.cf_paths.size(); i++) { + DeleteScheduler::CleanupDirectory(db_options.env, sfm, + result.cf_paths[i].path) + .PermitUncheckedError(); + } +#endif + + if (result.cf_paths.empty()) { + result.cf_paths = db_options.db_paths; + } + + if (result.level_compaction_dynamic_level_bytes) { + if (result.compaction_style != kCompactionStyleLevel) { + ROCKS_LOG_WARN(db_options.info_log.get(), + "level_compaction_dynamic_level_bytes only makes sense" + "for level-based compaction"); + result.level_compaction_dynamic_level_bytes = false; + } else if (result.cf_paths.size() > 1U) { + // we don't yet know how to make both of this feature and multiple + // DB path work. + ROCKS_LOG_WARN(db_options.info_log.get(), + "multiple cf_paths/db_paths and" + "level_compaction_dynamic_level_bytes" + "can't be used together"); + result.level_compaction_dynamic_level_bytes = false; + } + } + + if (result.max_compaction_bytes == 0) { + result.max_compaction_bytes = result.target_file_size_base * 25; + } + + bool is_block_based_table = (result.table_factory->IsInstanceOf( + TableFactory::kBlockBasedTableName())); + + const uint64_t kAdjustedTtl = 30 * 24 * 60 * 60; + if (result.ttl == kDefaultTtl) { + if (is_block_based_table && + result.compaction_style != kCompactionStyleFIFO) { + result.ttl = kAdjustedTtl; + } else { + result.ttl = 0; + } + } + + const uint64_t kAdjustedPeriodicCompSecs = 30 * 24 * 60 * 60; + + // Turn on periodic compactions and set them to occur once every 30 days if + // compaction filters are used and periodic_compaction_seconds is set to the + // default value. + if (result.compaction_style != kCompactionStyleFIFO) { + if ((result.compaction_filter != nullptr || + result.compaction_filter_factory != nullptr) && + result.periodic_compaction_seconds == kDefaultPeriodicCompSecs && + is_block_based_table) { + result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs; + } + } else { + // result.compaction_style == kCompactionStyleFIFO + if (result.ttl == 0) { + if (is_block_based_table) { + if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) { + result.periodic_compaction_seconds = kAdjustedPeriodicCompSecs; + } + result.ttl = result.periodic_compaction_seconds; + } + } else if (result.periodic_compaction_seconds != 0) { + result.ttl = std::min(result.ttl, result.periodic_compaction_seconds); + } + } + + // TTL compactions would work similar to Periodic Compactions in Universal in + // most of the cases. So, if ttl is set, execute the periodic compaction + // codepath. + if (result.compaction_style == kCompactionStyleUniversal && result.ttl != 0) { + if (result.periodic_compaction_seconds != 0) { + result.periodic_compaction_seconds = + std::min(result.ttl, result.periodic_compaction_seconds); + } else { + result.periodic_compaction_seconds = result.ttl; + } + } + + if (result.periodic_compaction_seconds == kDefaultPeriodicCompSecs) { + result.periodic_compaction_seconds = 0; + } + + return result; +} + +int SuperVersion::dummy = 0; +void* const SuperVersion::kSVInUse = &SuperVersion::dummy; +void* const SuperVersion::kSVObsolete = nullptr; + +SuperVersion::~SuperVersion() { + for (auto td : to_delete) { + delete td; + } +} + +SuperVersion* SuperVersion::Ref() { + refs.fetch_add(1, std::memory_order_relaxed); + return this; +} + +bool SuperVersion::Unref() { + // fetch_sub returns the previous value of ref + uint32_t previous_refs = refs.fetch_sub(1); + assert(previous_refs > 0); + return previous_refs == 1; +} + +void SuperVersion::Cleanup() { + assert(refs.load(std::memory_order_relaxed) == 0); + // Since this SuperVersion object is being deleted, + // decrement reference to the immutable MemtableList + // this SV object was pointing to. + imm->Unref(&to_delete); + MemTable* m = mem->Unref(); + if (m != nullptr) { + auto* memory_usage = current->cfd()->imm()->current_memory_usage(); + assert(*memory_usage >= m->ApproximateMemoryUsage()); + *memory_usage -= m->ApproximateMemoryUsage(); + to_delete.push_back(m); + } + current->Unref(); + cfd->UnrefAndTryDelete(); +} + +void SuperVersion::Init(ColumnFamilyData* new_cfd, MemTable* new_mem, + MemTableListVersion* new_imm, Version* new_current) { + cfd = new_cfd; + mem = new_mem; + imm = new_imm; + current = new_current; + cfd->Ref(); + mem->Ref(); + imm->Ref(); + current->Ref(); + refs.store(1, std::memory_order_relaxed); +} + +namespace { +void SuperVersionUnrefHandle(void* ptr) { + // UnrefHandle is called when a thread exits or a ThreadLocalPtr gets + // destroyed. When the former happens, the thread shouldn't see kSVInUse. + // When the latter happens, only super_version_ holds a reference + // to ColumnFamilyData, so no further queries are possible. + SuperVersion* sv = static_cast<SuperVersion*>(ptr); + bool was_last_ref __attribute__((__unused__)); + was_last_ref = sv->Unref(); + // Thread-local SuperVersions can't outlive ColumnFamilyData::super_version_. + // This is important because we can't do SuperVersion cleanup here. + // That would require locking DB mutex, which would deadlock because + // SuperVersionUnrefHandle is called with locked ThreadLocalPtr mutex. + assert(!was_last_ref); +} +} // anonymous namespace + +std::vector<std::string> ColumnFamilyData::GetDbPaths() const { + std::vector<std::string> paths; + paths.reserve(ioptions_.cf_paths.size()); + for (const DbPath& db_path : ioptions_.cf_paths) { + paths.emplace_back(db_path.path); + } + return paths; +} + +const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = + std::numeric_limits<uint32_t>::max(); + +ColumnFamilyData::ColumnFamilyData( + uint32_t id, const std::string& name, Version* _dummy_versions, + Cache* _table_cache, WriteBufferManager* write_buffer_manager, + const ColumnFamilyOptions& cf_options, const ImmutableDBOptions& db_options, + const FileOptions* file_options, ColumnFamilySet* column_family_set, + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id, + const std::string& db_session_id) + : id_(id), + name_(name), + dummy_versions_(_dummy_versions), + current_(nullptr), + refs_(0), + initialized_(false), + dropped_(false), + internal_comparator_(cf_options.comparator), + initial_cf_options_(SanitizeOptions(db_options, cf_options)), + ioptions_(db_options, initial_cf_options_), + mutable_cf_options_(initial_cf_options_), + is_delete_range_supported_( + cf_options.table_factory->IsDeleteRangeSupported()), + write_buffer_manager_(write_buffer_manager), + mem_(nullptr), + imm_(ioptions_.min_write_buffer_number_to_merge, + ioptions_.max_write_buffer_number_to_maintain, + ioptions_.max_write_buffer_size_to_maintain), + super_version_(nullptr), + super_version_number_(0), + local_sv_(new ThreadLocalPtr(&SuperVersionUnrefHandle)), + next_(nullptr), + prev_(nullptr), + log_number_(0), + flush_reason_(FlushReason::kOthers), + column_family_set_(column_family_set), + queued_for_flush_(false), + queued_for_compaction_(false), + prev_compaction_needed_bytes_(0), + allow_2pc_(db_options.allow_2pc), + last_memtable_id_(0), + db_paths_registered_(false), + mempurge_used_(false) { + if (id_ != kDummyColumnFamilyDataId) { + // TODO(cc): RegisterDbPaths can be expensive, considering moving it + // outside of this constructor which might be called with db mutex held. + // TODO(cc): considering using ioptions_.fs, currently some tests rely on + // EnvWrapper, that's the main reason why we use env here. + Status s = ioptions_.env->RegisterDbPaths(GetDbPaths()); + if (s.ok()) { + db_paths_registered_ = true; + } else { + ROCKS_LOG_ERROR( + ioptions_.logger, + "Failed to register data paths of column family (id: %d, name: %s)", + id_, name_.c_str()); + } + } + Ref(); + + // Convert user defined table properties collector factories to internal ones. + GetIntTblPropCollectorFactory(ioptions_, &int_tbl_prop_collector_factories_); + + // if _dummy_versions is nullptr, then this is a dummy column family. + if (_dummy_versions != nullptr) { + internal_stats_.reset( + new InternalStats(ioptions_.num_levels, ioptions_.clock, this)); + table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache, + block_cache_tracer, io_tracer, + db_session_id)); + blob_file_cache_.reset( + new BlobFileCache(_table_cache, ioptions(), soptions(), id_, + internal_stats_->GetBlobFileReadHist(), io_tracer)); + blob_source_.reset(new BlobSource(ioptions(), db_id, db_session_id, + blob_file_cache_.get())); + + if (ioptions_.compaction_style == kCompactionStyleLevel) { + compaction_picker_.reset( + new LevelCompactionPicker(ioptions_, &internal_comparator_)); +#ifndef ROCKSDB_LITE + } else if (ioptions_.compaction_style == kCompactionStyleUniversal) { + compaction_picker_.reset( + new UniversalCompactionPicker(ioptions_, &internal_comparator_)); + } else if (ioptions_.compaction_style == kCompactionStyleFIFO) { + compaction_picker_.reset( + new FIFOCompactionPicker(ioptions_, &internal_comparator_)); + } else if (ioptions_.compaction_style == kCompactionStyleNone) { + compaction_picker_.reset( + new NullCompactionPicker(ioptions_, &internal_comparator_)); + ROCKS_LOG_WARN(ioptions_.logger, + "Column family %s does not use any background compaction. " + "Compactions can only be done via CompactFiles\n", + GetName().c_str()); +#endif // !ROCKSDB_LITE + } else { + ROCKS_LOG_ERROR(ioptions_.logger, + "Unable to recognize the specified compaction style %d. " + "Column family %s will use kCompactionStyleLevel.\n", + ioptions_.compaction_style, GetName().c_str()); + compaction_picker_.reset( + new LevelCompactionPicker(ioptions_, &internal_comparator_)); + } + + if (column_family_set_->NumberOfColumnFamilies() < 10) { + ROCKS_LOG_INFO(ioptions_.logger, + "--------------- Options for column family [%s]:\n", + name.c_str()); + initial_cf_options_.Dump(ioptions_.logger); + } else { + ROCKS_LOG_INFO(ioptions_.logger, "\t(skipping printing options)\n"); + } + } + + RecalculateWriteStallConditions(mutable_cf_options_); + + if (cf_options.table_factory->IsInstanceOf( + TableFactory::kBlockBasedTableName()) && + cf_options.table_factory->GetOptions<BlockBasedTableOptions>()) { + const BlockBasedTableOptions* bbto = + cf_options.table_factory->GetOptions<BlockBasedTableOptions>(); + const auto& options_overrides = bbto->cache_usage_options.options_overrides; + const auto file_metadata_charged = + options_overrides.at(CacheEntryRole::kFileMetadata).charged; + if (bbto->block_cache && + file_metadata_charged == CacheEntryRoleOptions::Decision::kEnabled) { + // TODO(hx235): Add a `ConcurrentCacheReservationManager` at DB scope + // responsible for reservation of `ObsoleteFileInfo` so that we can keep + // this `file_metadata_cache_res_mgr_` nonconcurrent + file_metadata_cache_res_mgr_.reset(new ConcurrentCacheReservationManager( + std::make_shared< + CacheReservationManagerImpl<CacheEntryRole::kFileMetadata>>( + bbto->block_cache))); + } + } +} + +// DB mutex held +ColumnFamilyData::~ColumnFamilyData() { + assert(refs_.load(std::memory_order_relaxed) == 0); + // remove from linked list + auto prev = prev_; + auto next = next_; + prev->next_ = next; + next->prev_ = prev; + + if (!dropped_ && column_family_set_ != nullptr) { + // If it's dropped, it's already removed from column family set + // If column_family_set_ == nullptr, this is dummy CFD and not in + // ColumnFamilySet + column_family_set_->RemoveColumnFamily(this); + } + + if (current_ != nullptr) { + current_->Unref(); + } + + // It would be wrong if this ColumnFamilyData is in flush_queue_ or + // compaction_queue_ and we destroyed it + assert(!queued_for_flush_); + assert(!queued_for_compaction_); + assert(super_version_ == nullptr); + + if (dummy_versions_ != nullptr) { + // List must be empty + assert(dummy_versions_->Next() == dummy_versions_); + bool deleted __attribute__((__unused__)); + deleted = dummy_versions_->Unref(); + assert(deleted); + } + + if (mem_ != nullptr) { + delete mem_->Unref(); + } + autovector<MemTable*> to_delete; + imm_.current()->Unref(&to_delete); + for (MemTable* m : to_delete) { + delete m; + } + + if (db_paths_registered_) { + // TODO(cc): considering using ioptions_.fs, currently some tests rely on + // EnvWrapper, that's the main reason why we use env here. + Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths()); + if (!s.ok()) { + ROCKS_LOG_ERROR( + ioptions_.logger, + "Failed to unregister data paths of column family (id: %d, name: %s)", + id_, name_.c_str()); + } + } +} + +bool ColumnFamilyData::UnrefAndTryDelete() { + int old_refs = refs_.fetch_sub(1); + assert(old_refs > 0); + + if (old_refs == 1) { + assert(super_version_ == nullptr); + delete this; + return true; + } + + if (old_refs == 2 && super_version_ != nullptr) { + // Only the super_version_ holds me + SuperVersion* sv = super_version_; + super_version_ = nullptr; + + // Release SuperVersion references kept in ThreadLocalPtr. + local_sv_.reset(); + + if (sv->Unref()) { + // Note: sv will delete this ColumnFamilyData during Cleanup() + assert(sv->cfd == this); + sv->Cleanup(); + delete sv; + return true; + } + } + return false; +} + +void ColumnFamilyData::SetDropped() { + // can't drop default CF + assert(id_ != 0); + dropped_ = true; + write_controller_token_.reset(); + + // remove from column_family_set + column_family_set_->RemoveColumnFamily(this); +} + +ColumnFamilyOptions ColumnFamilyData::GetLatestCFOptions() const { + return BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); +} + +uint64_t ColumnFamilyData::OldestLogToKeep() { + auto current_log = GetLogNumber(); + + if (allow_2pc_) { + auto imm_prep_log = imm()->PrecomputeMinLogContainingPrepSection(); + auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); + + if (imm_prep_log > 0 && imm_prep_log < current_log) { + current_log = imm_prep_log; + } + + if (mem_prep_log > 0 && mem_prep_log < current_log) { + current_log = mem_prep_log; + } + } + + return current_log; +} + +const double kIncSlowdownRatio = 0.8; +const double kDecSlowdownRatio = 1 / kIncSlowdownRatio; +const double kNearStopSlowdownRatio = 0.6; +const double kDelayRecoverSlowdownRatio = 1.4; + +namespace { +// If penalize_stop is true, we further reduce slowdown rate. +std::unique_ptr<WriteControllerToken> SetupDelay( + WriteController* write_controller, uint64_t compaction_needed_bytes, + uint64_t prev_compaction_need_bytes, bool penalize_stop, + bool auto_compactions_disabled) { + const uint64_t kMinWriteRate = 16 * 1024u; // Minimum write rate 16KB/s. + + uint64_t max_write_rate = write_controller->max_delayed_write_rate(); + uint64_t write_rate = write_controller->delayed_write_rate(); + + if (auto_compactions_disabled) { + // When auto compaction is disabled, always use the value user gave. + write_rate = max_write_rate; + } else if (write_controller->NeedsDelay() && max_write_rate > kMinWriteRate) { + // If user gives rate less than kMinWriteRate, don't adjust it. + // + // If already delayed, need to adjust based on previous compaction debt. + // When there are two or more column families require delay, we always + // increase or reduce write rate based on information for one single + // column family. It is likely to be OK but we can improve if there is a + // problem. + // Ignore compaction_needed_bytes = 0 case because compaction_needed_bytes + // is only available in level-based compaction + // + // If the compaction debt stays the same as previously, we also further slow + // down. It usually means a mem table is full. It's mainly for the case + // where both of flush and compaction are much slower than the speed we + // insert to mem tables, so we need to actively slow down before we get + // feedback signal from compaction and flushes to avoid the full stop + // because of hitting the max write buffer number. + // + // If DB just falled into the stop condition, we need to further reduce + // the write rate to avoid the stop condition. + if (penalize_stop) { + // Penalize the near stop or stop condition by more aggressive slowdown. + // This is to provide the long term slowdown increase signal. + // The penalty is more than the reward of recovering to the normal + // condition. + write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * + kNearStopSlowdownRatio); + if (write_rate < kMinWriteRate) { + write_rate = kMinWriteRate; + } + } else if (prev_compaction_need_bytes > 0 && + prev_compaction_need_bytes <= compaction_needed_bytes) { + write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * + kIncSlowdownRatio); + if (write_rate < kMinWriteRate) { + write_rate = kMinWriteRate; + } + } else if (prev_compaction_need_bytes > compaction_needed_bytes) { + // We are speeding up by ratio of kSlowdownRatio when we have paid + // compaction debt. But we'll never speed up to faster than the write rate + // given by users. + write_rate = static_cast<uint64_t>(static_cast<double>(write_rate) * + kDecSlowdownRatio); + if (write_rate > max_write_rate) { + write_rate = max_write_rate; + } + } + } + return write_controller->GetDelayToken(write_rate); +} + +int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, + int level0_slowdown_writes_trigger) { + // SanitizeOptions() ensures it. + assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger); + + if (level0_file_num_compaction_trigger < 0) { + return std::numeric_limits<int>::max(); + } + + const int64_t twice_level0_trigger = + static_cast<int64_t>(level0_file_num_compaction_trigger) * 2; + + const int64_t one_fourth_trigger_slowdown = + static_cast<int64_t>(level0_file_num_compaction_trigger) + + ((level0_slowdown_writes_trigger - level0_file_num_compaction_trigger) / + 4); + + assert(twice_level0_trigger >= 0); + assert(one_fourth_trigger_slowdown >= 0); + + // 1/4 of the way between L0 compaction trigger threshold and slowdown + // condition. + // Or twice as compaction trigger, if it is smaller. + int64_t res = std::min(twice_level0_trigger, one_fourth_trigger_slowdown); + if (res >= std::numeric_limits<int32_t>::max()) { + return std::numeric_limits<int32_t>::max(); + } else { + // res fits in int + return static_cast<int>(res); + } +} +} // anonymous namespace + +std::pair<WriteStallCondition, ColumnFamilyData::WriteStallCause> +ColumnFamilyData::GetWriteStallConditionAndCause( + int num_unflushed_memtables, int num_l0_files, + uint64_t num_compaction_needed_bytes, + const MutableCFOptions& mutable_cf_options, + const ImmutableCFOptions& immutable_cf_options) { + if (num_unflushed_memtables >= mutable_cf_options.max_write_buffer_number) { + return {WriteStallCondition::kStopped, WriteStallCause::kMemtableLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + num_l0_files >= mutable_cf_options.level0_stop_writes_trigger) { + return {WriteStallCondition::kStopped, WriteStallCause::kL0FileCountLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + num_compaction_needed_bytes >= + mutable_cf_options.hard_pending_compaction_bytes_limit) { + return {WriteStallCondition::kStopped, + WriteStallCause::kPendingCompactionBytes}; + } else if (mutable_cf_options.max_write_buffer_number > 3 && + num_unflushed_memtables >= + mutable_cf_options.max_write_buffer_number - 1 && + num_unflushed_memtables - 1 >= + immutable_cf_options.min_write_buffer_number_to_merge) { + return {WriteStallCondition::kDelayed, WriteStallCause::kMemtableLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.level0_slowdown_writes_trigger >= 0 && + num_l0_files >= + mutable_cf_options.level0_slowdown_writes_trigger) { + return {WriteStallCondition::kDelayed, WriteStallCause::kL0FileCountLimit}; + } else if (!mutable_cf_options.disable_auto_compactions && + mutable_cf_options.soft_pending_compaction_bytes_limit > 0 && + num_compaction_needed_bytes >= + mutable_cf_options.soft_pending_compaction_bytes_limit) { + return {WriteStallCondition::kDelayed, + WriteStallCause::kPendingCompactionBytes}; + } + return {WriteStallCondition::kNormal, WriteStallCause::kNone}; +} + +WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( + const MutableCFOptions& mutable_cf_options) { + auto write_stall_condition = WriteStallCondition::kNormal; + if (current_ != nullptr) { + auto* vstorage = current_->storage_info(); + auto write_controller = column_family_set_->write_controller_; + uint64_t compaction_needed_bytes = + vstorage->estimated_compaction_needed_bytes(); + + auto write_stall_condition_and_cause = GetWriteStallConditionAndCause( + imm()->NumNotFlushed(), vstorage->l0_delay_trigger_count(), + vstorage->estimated_compaction_needed_bytes(), mutable_cf_options, + *ioptions()); + write_stall_condition = write_stall_condition_and_cause.first; + auto write_stall_cause = write_stall_condition_and_cause.second; + + bool was_stopped = write_controller->IsStopped(); + bool needed_delay = write_controller->NeedsDelay(); + + if (write_stall_condition == WriteStallCondition::kStopped && + write_stall_cause == WriteStallCause::kMemtableLimit) { + write_controller_token_ = write_controller->GetStopToken(); + internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_STOPS, 1); + ROCKS_LOG_WARN( + ioptions_.logger, + "[%s] Stopping writes because we have %d immutable memtables " + "(waiting for flush), max_write_buffer_number is set to %d", + name_.c_str(), imm()->NumNotFlushed(), + mutable_cf_options.max_write_buffer_number); + } else if (write_stall_condition == WriteStallCondition::kStopped && + write_stall_cause == WriteStallCause::kL0FileCountLimit) { + write_controller_token_ = write_controller->GetStopToken(); + internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_STOPS, 1); + if (compaction_picker_->IsLevel0CompactionInProgress()) { + internal_stats_->AddCFStats( + InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_STOPS, 1); + } + ROCKS_LOG_WARN(ioptions_.logger, + "[%s] Stopping writes because we have %d level-0 files", + name_.c_str(), vstorage->l0_delay_trigger_count()); + } else if (write_stall_condition == WriteStallCondition::kStopped && + write_stall_cause == WriteStallCause::kPendingCompactionBytes) { + write_controller_token_ = write_controller->GetStopToken(); + internal_stats_->AddCFStats( + InternalStats::PENDING_COMPACTION_BYTES_LIMIT_STOPS, 1); + ROCKS_LOG_WARN( + ioptions_.logger, + "[%s] Stopping writes because of estimated pending compaction " + "bytes %" PRIu64, + name_.c_str(), compaction_needed_bytes); + } else if (write_stall_condition == WriteStallCondition::kDelayed && + write_stall_cause == WriteStallCause::kMemtableLimit) { + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped, + mutable_cf_options.disable_auto_compactions); + internal_stats_->AddCFStats(InternalStats::MEMTABLE_LIMIT_SLOWDOWNS, 1); + ROCKS_LOG_WARN( + ioptions_.logger, + "[%s] Stalling writes because we have %d immutable memtables " + "(waiting for flush), max_write_buffer_number is set to %d " + "rate %" PRIu64, + name_.c_str(), imm()->NumNotFlushed(), + mutable_cf_options.max_write_buffer_number, + write_controller->delayed_write_rate()); + } else if (write_stall_condition == WriteStallCondition::kDelayed && + write_stall_cause == WriteStallCause::kL0FileCountLimit) { + // L0 is the last two files from stopping. + bool near_stop = vstorage->l0_delay_trigger_count() >= + mutable_cf_options.level0_stop_writes_trigger - 2; + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped || near_stop, + mutable_cf_options.disable_auto_compactions); + internal_stats_->AddCFStats(InternalStats::L0_FILE_COUNT_LIMIT_SLOWDOWNS, + 1); + if (compaction_picker_->IsLevel0CompactionInProgress()) { + internal_stats_->AddCFStats( + InternalStats::LOCKED_L0_FILE_COUNT_LIMIT_SLOWDOWNS, 1); + } + ROCKS_LOG_WARN(ioptions_.logger, + "[%s] Stalling writes because we have %d level-0 files " + "rate %" PRIu64, + name_.c_str(), vstorage->l0_delay_trigger_count(), + write_controller->delayed_write_rate()); + } else if (write_stall_condition == WriteStallCondition::kDelayed && + write_stall_cause == WriteStallCause::kPendingCompactionBytes) { + // If the distance to hard limit is less than 1/4 of the gap between soft + // and + // hard bytes limit, we think it is near stop and speed up the slowdown. + bool near_stop = + mutable_cf_options.hard_pending_compaction_bytes_limit > 0 && + (compaction_needed_bytes - + mutable_cf_options.soft_pending_compaction_bytes_limit) > + 3 * + (mutable_cf_options.hard_pending_compaction_bytes_limit - + mutable_cf_options.soft_pending_compaction_bytes_limit) / + 4; + + write_controller_token_ = + SetupDelay(write_controller, compaction_needed_bytes, + prev_compaction_needed_bytes_, was_stopped || near_stop, + mutable_cf_options.disable_auto_compactions); + internal_stats_->AddCFStats( + InternalStats::PENDING_COMPACTION_BYTES_LIMIT_SLOWDOWNS, 1); + ROCKS_LOG_WARN( + ioptions_.logger, + "[%s] Stalling writes because of estimated pending compaction " + "bytes %" PRIu64 " rate %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes(), + write_controller->delayed_write_rate()); + } else { + assert(write_stall_condition == WriteStallCondition::kNormal); + if (vstorage->l0_delay_trigger_count() >= + GetL0ThresholdSpeedupCompaction( + mutable_cf_options.level0_file_num_compaction_trigger, + mutable_cf_options.level0_slowdown_writes_trigger)) { + write_controller_token_ = + write_controller->GetCompactionPressureToken(); + ROCKS_LOG_INFO( + ioptions_.logger, + "[%s] Increasing compaction threads because we have %d level-0 " + "files ", + name_.c_str(), vstorage->l0_delay_trigger_count()); + } else if (vstorage->estimated_compaction_needed_bytes() >= + mutable_cf_options.soft_pending_compaction_bytes_limit / 4) { + // Increase compaction threads if bytes needed for compaction exceeds + // 1/4 of threshold for slowing down. + // If soft pending compaction byte limit is not set, always speed up + // compaction. + write_controller_token_ = + write_controller->GetCompactionPressureToken(); + if (mutable_cf_options.soft_pending_compaction_bytes_limit > 0) { + ROCKS_LOG_INFO( + ioptions_.logger, + "[%s] Increasing compaction threads because of estimated pending " + "compaction " + "bytes %" PRIu64, + name_.c_str(), vstorage->estimated_compaction_needed_bytes()); + } + } else { + write_controller_token_.reset(); + } + // If the DB recovers from delay conditions, we reward with reducing + // double the slowdown ratio. This is to balance the long term slowdown + // increase signal. + if (needed_delay) { + uint64_t write_rate = write_controller->delayed_write_rate(); + write_controller->set_delayed_write_rate(static_cast<uint64_t>( + static_cast<double>(write_rate) * kDelayRecoverSlowdownRatio)); + // Set the low pri limit to be 1/4 the delayed write rate. + // Note we don't reset this value even after delay condition is relased. + // Low-pri rate will continue to apply if there is a compaction + // pressure. + write_controller->low_pri_rate_limiter()->SetBytesPerSecond(write_rate / + 4); + } + } + prev_compaction_needed_bytes_ = compaction_needed_bytes; + } + return write_stall_condition; +} + +const FileOptions* ColumnFamilyData::soptions() const { + return &(column_family_set_->file_options_); +} + +void ColumnFamilyData::SetCurrent(Version* current_version) { + current_ = current_version; +} + +uint64_t ColumnFamilyData::GetNumLiveVersions() const { + return VersionSet::GetNumLiveVersions(dummy_versions_); +} + +uint64_t ColumnFamilyData::GetTotalSstFilesSize() const { + return VersionSet::GetTotalSstFilesSize(dummy_versions_); +} + +uint64_t ColumnFamilyData::GetTotalBlobFileSize() const { + return VersionSet::GetTotalBlobFileSize(dummy_versions_); +} + +uint64_t ColumnFamilyData::GetLiveSstFilesSize() const { + return current_->GetSstFilesSize(); +} + +MemTable* ColumnFamilyData::ConstructNewMemtable( + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + return new MemTable(internal_comparator_, ioptions_, mutable_cf_options, + write_buffer_manager_, earliest_seq, id_); +} + +void ColumnFamilyData::CreateNewMemtable( + const MutableCFOptions& mutable_cf_options, SequenceNumber earliest_seq) { + if (mem_ != nullptr) { + delete mem_->Unref(); + } + SetMemtable(ConstructNewMemtable(mutable_cf_options, earliest_seq)); + mem_->Ref(); +} + +bool ColumnFamilyData::NeedsCompaction() const { + return !mutable_cf_options_.disable_auto_compactions && + compaction_picker_->NeedsCompaction(current_->storage_info()); +} + +Compaction* ColumnFamilyData::PickCompaction( + const MutableCFOptions& mutable_options, + const MutableDBOptions& mutable_db_options, LogBuffer* log_buffer) { + SequenceNumber earliest_mem_seqno = + std::min(mem_->GetEarliestSequenceNumber(), + imm_.current()->GetEarliestSequenceNumber(false)); + auto* result = compaction_picker_->PickCompaction( + GetName(), mutable_options, mutable_db_options, current_->storage_info(), + log_buffer, earliest_mem_seqno); + if (result != nullptr) { + result->SetInputVersion(current_); + } + return result; +} + +bool ColumnFamilyData::RangeOverlapWithCompaction( + const Slice& smallest_user_key, const Slice& largest_user_key, + int level) const { + return compaction_picker_->RangeOverlapWithCompaction( + smallest_user_key, largest_user_key, level); +} + +Status ColumnFamilyData::RangesOverlapWithMemtables( + const autovector<Range>& ranges, SuperVersion* super_version, + bool allow_data_in_errors, bool* overlap) { + assert(overlap != nullptr); + *overlap = false; + // Create an InternalIterator over all unflushed memtables + Arena arena; + ReadOptions read_opts; + read_opts.total_order_seek = true; + MergeIteratorBuilder merge_iter_builder(&internal_comparator_, &arena); + merge_iter_builder.AddIterator( + super_version->mem->NewIterator(read_opts, &arena)); + super_version->imm->AddIterators(read_opts, &merge_iter_builder, + false /* add_range_tombstone_iter */); + ScopedArenaIterator memtable_iter(merge_iter_builder.Finish()); + + auto read_seq = super_version->current->version_set()->LastSequence(); + ReadRangeDelAggregator range_del_agg(&internal_comparator_, read_seq); + auto* active_range_del_iter = super_version->mem->NewRangeTombstoneIterator( + read_opts, read_seq, false /* immutable_memtable */); + range_del_agg.AddTombstones( + std::unique_ptr<FragmentedRangeTombstoneIterator>(active_range_del_iter)); + Status status; + status = super_version->imm->AddRangeTombstoneIterators( + read_opts, nullptr /* arena */, &range_del_agg); + // AddRangeTombstoneIterators always return Status::OK. + assert(status.ok()); + + for (size_t i = 0; i < ranges.size() && status.ok() && !*overlap; ++i) { + auto* vstorage = super_version->current->storage_info(); + auto* ucmp = vstorage->InternalComparator()->user_comparator(); + InternalKey range_start(ranges[i].start, kMaxSequenceNumber, + kValueTypeForSeek); + memtable_iter->Seek(range_start.Encode()); + status = memtable_iter->status(); + ParsedInternalKey seek_result; + + if (status.ok() && memtable_iter->Valid()) { + status = ParseInternalKey(memtable_iter->key(), &seek_result, + allow_data_in_errors); + } + + if (status.ok()) { + if (memtable_iter->Valid() && + ucmp->Compare(seek_result.user_key, ranges[i].limit) <= 0) { + *overlap = true; + } else if (range_del_agg.IsRangeOverlapped(ranges[i].start, + ranges[i].limit)) { + *overlap = true; + } + } + } + return status; +} + +const int ColumnFamilyData::kCompactAllLevels = -1; +const int ColumnFamilyData::kCompactToBaseLevel = -2; + +Compaction* ColumnFamilyData::CompactRange( + const MutableCFOptions& mutable_cf_options, + const MutableDBOptions& mutable_db_options, int input_level, + int output_level, const CompactRangeOptions& compact_range_options, + const InternalKey* begin, const InternalKey* end, + InternalKey** compaction_end, bool* conflict, + uint64_t max_file_num_to_ignore, const std::string& trim_ts) { + auto* result = compaction_picker_->CompactRange( + GetName(), mutable_cf_options, mutable_db_options, + current_->storage_info(), input_level, output_level, + compact_range_options, begin, end, compaction_end, conflict, + max_file_num_to_ignore, trim_ts); + if (result != nullptr) { + result->SetInputVersion(current_); + } + return result; +} + +SuperVersion* ColumnFamilyData::GetReferencedSuperVersion(DBImpl* db) { + SuperVersion* sv = GetThreadLocalSuperVersion(db); + sv->Ref(); + if (!ReturnThreadLocalSuperVersion(sv)) { + // This Unref() corresponds to the Ref() in GetThreadLocalSuperVersion() + // when the thread-local pointer was populated. So, the Ref() earlier in + // this function still prevents the returned SuperVersion* from being + // deleted out from under the caller. + sv->Unref(); + } + return sv; +} + +SuperVersion* ColumnFamilyData::GetThreadLocalSuperVersion(DBImpl* db) { + // The SuperVersion is cached in thread local storage to avoid acquiring + // mutex when SuperVersion does not change since the last use. When a new + // SuperVersion is installed, the compaction or flush thread cleans up + // cached SuperVersion in all existing thread local storage. To avoid + // acquiring mutex for this operation, we use atomic Swap() on the thread + // local pointer to guarantee exclusive access. If the thread local pointer + // is being used while a new SuperVersion is installed, the cached + // SuperVersion can become stale. In that case, the background thread would + // have swapped in kSVObsolete. We re-check the value at when returning + // SuperVersion back to thread local, with an atomic compare and swap. + // The superversion will need to be released if detected to be stale. + void* ptr = local_sv_->Swap(SuperVersion::kSVInUse); + // Invariant: + // (1) Scrape (always) installs kSVObsolete in ThreadLocal storage + // (2) the Swap above (always) installs kSVInUse, ThreadLocal storage + // should only keep kSVInUse before ReturnThreadLocalSuperVersion call + // (if no Scrape happens). + assert(ptr != SuperVersion::kSVInUse); + SuperVersion* sv = static_cast<SuperVersion*>(ptr); + if (sv == SuperVersion::kSVObsolete || + sv->version_number != super_version_number_.load()) { + RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_ACQUIRES); + SuperVersion* sv_to_delete = nullptr; + + if (sv && sv->Unref()) { + RecordTick(ioptions_.stats, NUMBER_SUPERVERSION_CLEANUPS); + db->mutex()->Lock(); + // NOTE: underlying resources held by superversion (sst files) might + // not be released until the next background job. + sv->Cleanup(); + if (db->immutable_db_options().avoid_unnecessary_blocking_io) { + db->AddSuperVersionsToFreeQueue(sv); + db->SchedulePurge(); + } else { + sv_to_delete = sv; + } + } else { + db->mutex()->Lock(); + } + sv = super_version_->Ref(); + db->mutex()->Unlock(); + + delete sv_to_delete; + } + assert(sv != nullptr); + return sv; +} + +bool ColumnFamilyData::ReturnThreadLocalSuperVersion(SuperVersion* sv) { + assert(sv != nullptr); + // Put the SuperVersion back + void* expected = SuperVersion::kSVInUse; + if (local_sv_->CompareAndSwap(static_cast<void*>(sv), expected)) { + // When we see kSVInUse in the ThreadLocal, we are sure ThreadLocal + // storage has not been altered and no Scrape has happened. The + // SuperVersion is still current. + return true; + } else { + // ThreadLocal scrape happened in the process of this GetImpl call (after + // thread local Swap() at the beginning and before CompareAndSwap()). + // This means the SuperVersion it holds is obsolete. + assert(expected == SuperVersion::kSVObsolete); + } + return false; +} + +void ColumnFamilyData::InstallSuperVersion(SuperVersionContext* sv_context, + InstrumentedMutex* db_mutex) { + db_mutex->AssertHeld(); + return InstallSuperVersion(sv_context, mutable_cf_options_); +} + +void ColumnFamilyData::InstallSuperVersion( + SuperVersionContext* sv_context, + const MutableCFOptions& mutable_cf_options) { + SuperVersion* new_superversion = sv_context->new_superversion.release(); + new_superversion->mutable_cf_options = mutable_cf_options; + new_superversion->Init(this, mem_, imm_.current(), current_); + SuperVersion* old_superversion = super_version_; + super_version_ = new_superversion; + ++super_version_number_; + super_version_->version_number = super_version_number_; + if (old_superversion == nullptr || old_superversion->current != current() || + old_superversion->mem != mem_ || + old_superversion->imm != imm_.current()) { + // Should not recalculate slow down condition if nothing has changed, since + // currently RecalculateWriteStallConditions() treats it as further slowing + // down is needed. + super_version_->write_stall_condition = + RecalculateWriteStallConditions(mutable_cf_options); + } else { + super_version_->write_stall_condition = + old_superversion->write_stall_condition; + } + if (old_superversion != nullptr) { + // Reset SuperVersions cached in thread local storage. + // This should be done before old_superversion->Unref(). That's to ensure + // that local_sv_ never holds the last reference to SuperVersion, since + // it has no means to safely do SuperVersion cleanup. + ResetThreadLocalSuperVersions(); + + if (old_superversion->mutable_cf_options.write_buffer_size != + mutable_cf_options.write_buffer_size) { + mem_->UpdateWriteBufferSize(mutable_cf_options.write_buffer_size); + } + if (old_superversion->write_stall_condition != + new_superversion->write_stall_condition) { + sv_context->PushWriteStallNotification( + old_superversion->write_stall_condition, + new_superversion->write_stall_condition, GetName(), ioptions()); + } + if (old_superversion->Unref()) { + old_superversion->Cleanup(); + sv_context->superversions_to_free.push_back(old_superversion); + } + } +} + +void ColumnFamilyData::ResetThreadLocalSuperVersions() { + autovector<void*> sv_ptrs; + local_sv_->Scrape(&sv_ptrs, SuperVersion::kSVObsolete); + for (auto ptr : sv_ptrs) { + assert(ptr); + if (ptr == SuperVersion::kSVInUse) { + continue; + } + auto sv = static_cast<SuperVersion*>(ptr); + bool was_last_ref __attribute__((__unused__)); + was_last_ref = sv->Unref(); + // sv couldn't have been the last reference because + // ResetThreadLocalSuperVersions() is called before + // unref'ing super_version_. + assert(!was_last_ref); + } +} + +Status ColumnFamilyData::ValidateOptions( + const DBOptions& db_options, const ColumnFamilyOptions& cf_options) { + Status s; + s = CheckCompressionSupported(cf_options); + if (s.ok() && db_options.allow_concurrent_memtable_write) { + s = CheckConcurrentWritesSupported(cf_options); + } + if (s.ok() && db_options.unordered_write && + cf_options.max_successive_merges != 0) { + s = Status::InvalidArgument( + "max_successive_merges > 0 is incompatible with unordered_write"); + } + if (s.ok()) { + s = CheckCFPathsSupported(db_options, cf_options); + } + if (!s.ok()) { + return s; + } + + if (cf_options.ttl > 0 && cf_options.ttl != kDefaultTtl) { + if (!cf_options.table_factory->IsInstanceOf( + TableFactory::kBlockBasedTableName())) { + return Status::NotSupported( + "TTL is only supported in Block-Based Table format. "); + } + } + + if (cf_options.periodic_compaction_seconds > 0 && + cf_options.periodic_compaction_seconds != kDefaultPeriodicCompSecs) { + if (!cf_options.table_factory->IsInstanceOf( + TableFactory::kBlockBasedTableName())) { + return Status::NotSupported( + "Periodic Compaction is only supported in " + "Block-Based Table format. "); + } + } + + if (cf_options.enable_blob_garbage_collection) { + if (cf_options.blob_garbage_collection_age_cutoff < 0.0 || + cf_options.blob_garbage_collection_age_cutoff > 1.0) { + return Status::InvalidArgument( + "The age cutoff for blob garbage collection should be in the range " + "[0.0, 1.0]."); + } + if (cf_options.blob_garbage_collection_force_threshold < 0.0 || + cf_options.blob_garbage_collection_force_threshold > 1.0) { + return Status::InvalidArgument( + "The garbage ratio threshold for forcing blob garbage collection " + "should be in the range [0.0, 1.0]."); + } + } + + if (cf_options.compaction_style == kCompactionStyleFIFO && + db_options.max_open_files != -1 && cf_options.ttl > 0) { + return Status::NotSupported( + "FIFO compaction only supported with max_open_files = -1."); + } + + std::vector<uint32_t> supported{0, 1, 2, 4, 8}; + if (std::find(supported.begin(), supported.end(), + cf_options.memtable_protection_bytes_per_key) == + supported.end()) { + return Status::NotSupported( + "Memtable per key-value checksum protection only supports 0, 1, 2, 4 " + "or 8 bytes per key."); + } + return s; +} + +#ifndef ROCKSDB_LITE +Status ColumnFamilyData::SetOptions( + const DBOptions& db_opts, + const std::unordered_map<std::string, std::string>& options_map) { + ColumnFamilyOptions cf_opts = + BuildColumnFamilyOptions(initial_cf_options_, mutable_cf_options_); + ConfigOptions config_opts; + config_opts.mutable_options_only = true; + Status s = GetColumnFamilyOptionsFromMap(config_opts, cf_opts, options_map, + &cf_opts); + if (s.ok()) { + s = ValidateOptions(db_opts, cf_opts); + } + if (s.ok()) { + mutable_cf_options_ = MutableCFOptions(cf_opts); + mutable_cf_options_.RefreshDerivedOptions(ioptions_); + } + return s; +} +#endif // ROCKSDB_LITE + +// REQUIRES: DB mutex held +Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) { + if (initial_cf_options_.compaction_style != kCompactionStyleLevel) { + return Env::WLTH_NOT_SET; + } + if (level == 0) { + return Env::WLTH_MEDIUM; + } + int base_level = current_->storage_info()->base_level(); + + // L1: medium, L2: long, ... + if (level - base_level >= 2) { + return Env::WLTH_EXTREME; + } else if (level < base_level) { + // There is no restriction which prevents level passed in to be smaller + // than base_level. + return Env::WLTH_MEDIUM; + } + return static_cast<Env::WriteLifeTimeHint>( + level - base_level + static_cast<int>(Env::WLTH_MEDIUM)); +} + +Status ColumnFamilyData::AddDirectories( + std::map<std::string, std::shared_ptr<FSDirectory>>* created_dirs) { + Status s; + assert(created_dirs != nullptr); + assert(data_dirs_.empty()); + for (auto& p : ioptions_.cf_paths) { + auto existing_dir = created_dirs->find(p.path); + + if (existing_dir == created_dirs->end()) { + std::unique_ptr<FSDirectory> path_directory; + s = DBImpl::CreateAndNewDirectory(ioptions_.fs.get(), p.path, + &path_directory); + if (!s.ok()) { + return s; + } + assert(path_directory != nullptr); + data_dirs_.emplace_back(path_directory.release()); + (*created_dirs)[p.path] = data_dirs_.back(); + } else { + data_dirs_.emplace_back(existing_dir->second); + } + } + assert(data_dirs_.size() == ioptions_.cf_paths.size()); + return s; +} + +FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const { + if (data_dirs_.empty()) { + return nullptr; + } + + assert(path_id < data_dirs_.size()); + return data_dirs_[path_id].get(); +} + +ColumnFamilySet::ColumnFamilySet(const std::string& dbname, + const ImmutableDBOptions* db_options, + const FileOptions& file_options, + Cache* table_cache, + WriteBufferManager* _write_buffer_manager, + WriteController* _write_controller, + BlockCacheTracer* const block_cache_tracer, + const std::shared_ptr<IOTracer>& io_tracer, + const std::string& db_id, + const std::string& db_session_id) + : max_column_family_(0), + file_options_(file_options), + dummy_cfd_(new ColumnFamilyData( + ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr, + nullptr, ColumnFamilyOptions(), *db_options, &file_options_, nullptr, + block_cache_tracer, io_tracer, db_id, db_session_id)), + default_cfd_cache_(nullptr), + db_name_(dbname), + db_options_(db_options), + table_cache_(table_cache), + write_buffer_manager_(_write_buffer_manager), + write_controller_(_write_controller), + block_cache_tracer_(block_cache_tracer), + io_tracer_(io_tracer), + db_id_(db_id), + db_session_id_(db_session_id) { + // initialize linked list + dummy_cfd_->prev_ = dummy_cfd_; + dummy_cfd_->next_ = dummy_cfd_; +} + +ColumnFamilySet::~ColumnFamilySet() { + while (column_family_data_.size() > 0) { + // cfd destructor will delete itself from column_family_data_ + auto cfd = column_family_data_.begin()->second; + bool last_ref __attribute__((__unused__)); + last_ref = cfd->UnrefAndTryDelete(); + assert(last_ref); + } + bool dummy_last_ref __attribute__((__unused__)); + dummy_last_ref = dummy_cfd_->UnrefAndTryDelete(); + assert(dummy_last_ref); +} + +ColumnFamilyData* ColumnFamilySet::GetDefault() const { + assert(default_cfd_cache_ != nullptr); + return default_cfd_cache_; +} + +ColumnFamilyData* ColumnFamilySet::GetColumnFamily(uint32_t id) const { + auto cfd_iter = column_family_data_.find(id); + if (cfd_iter != column_family_data_.end()) { + return cfd_iter->second; + } else { + return nullptr; + } +} + +ColumnFamilyData* ColumnFamilySet::GetColumnFamily( + const std::string& name) const { + auto cfd_iter = column_families_.find(name); + if (cfd_iter != column_families_.end()) { + auto cfd = GetColumnFamily(cfd_iter->second); + assert(cfd != nullptr); + return cfd; + } else { + return nullptr; + } +} + +uint32_t ColumnFamilySet::GetNextColumnFamilyID() { + return ++max_column_family_; +} + +uint32_t ColumnFamilySet::GetMaxColumnFamily() { return max_column_family_; } + +void ColumnFamilySet::UpdateMaxColumnFamily(uint32_t new_max_column_family) { + max_column_family_ = std::max(new_max_column_family, max_column_family_); +} + +size_t ColumnFamilySet::NumberOfColumnFamilies() const { + return column_families_.size(); +} + +// under a DB mutex AND write thread +ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( + const std::string& name, uint32_t id, Version* dummy_versions, + const ColumnFamilyOptions& options) { + assert(column_families_.find(name) == column_families_.end()); + ColumnFamilyData* new_cfd = new ColumnFamilyData( + id, name, dummy_versions, table_cache_, write_buffer_manager_, options, + *db_options_, &file_options_, this, block_cache_tracer_, io_tracer_, + db_id_, db_session_id_); + column_families_.insert({name, id}); + column_family_data_.insert({id, new_cfd}); + max_column_family_ = std::max(max_column_family_, id); + // add to linked list + new_cfd->next_ = dummy_cfd_; + auto prev = dummy_cfd_->prev_; + new_cfd->prev_ = prev; + prev->next_ = new_cfd; + dummy_cfd_->prev_ = new_cfd; + if (id == 0) { + default_cfd_cache_ = new_cfd; + } + return new_cfd; +} + +// under a DB mutex AND from a write thread +void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { + auto cfd_iter = column_family_data_.find(cfd->GetID()); + assert(cfd_iter != column_family_data_.end()); + column_family_data_.erase(cfd_iter); + column_families_.erase(cfd->GetName()); +} + +// under a DB mutex OR from a write thread +bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { + if (column_family_id == 0) { + // optimization for common case + current_ = column_family_set_->GetDefault(); + } else { + current_ = column_family_set_->GetColumnFamily(column_family_id); + } + handle_.SetCFD(current_); + return current_ != nullptr; +} + +uint64_t ColumnFamilyMemTablesImpl::GetLogNumber() const { + assert(current_ != nullptr); + return current_->GetLogNumber(); +} + +MemTable* ColumnFamilyMemTablesImpl::GetMemTable() const { + assert(current_ != nullptr); + return current_->mem(); +} + +ColumnFamilyHandle* ColumnFamilyMemTablesImpl::GetColumnFamilyHandle() { + assert(current_ != nullptr); + return &handle_; +} + +uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { + uint32_t column_family_id = 0; + if (column_family != nullptr) { + auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); + column_family_id = cfh->GetID(); + } + return column_family_id; +} + +const Comparator* GetColumnFamilyUserComparator( + ColumnFamilyHandle* column_family) { + if (column_family != nullptr) { + return column_family->GetComparator(); + } + return nullptr; +} + +} // namespace ROCKSDB_NAMESPACE |