summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/flush_job.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/flush_job.cc')
-rw-r--r--src/rocksdb/db/flush_job.cc1094
1 files changed, 1094 insertions, 0 deletions
diff --git a/src/rocksdb/db/flush_job.cc b/src/rocksdb/db/flush_job.cc
new file mode 100644
index 000000000..645e42f44
--- /dev/null
+++ b/src/rocksdb/db/flush_job.cc
@@ -0,0 +1,1094 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "db/flush_job.h"
+
+#include <algorithm>
+#include <cinttypes>
+#include <vector>
+
+#include "db/builder.h"
+#include "db/db_iter.h"
+#include "db/dbformat.h"
+#include "db/event_helpers.h"
+#include "db/log_reader.h"
+#include "db/log_writer.h"
+#include "db/memtable.h"
+#include "db/memtable_list.h"
+#include "db/merge_context.h"
+#include "db/range_tombstone_fragmenter.h"
+#include "db/version_set.h"
+#include "file/file_util.h"
+#include "file/filename.h"
+#include "logging/event_logger.h"
+#include "logging/log_buffer.h"
+#include "logging/logging.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/thread_status_util.h"
+#include "port/port.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/status.h"
+#include "rocksdb/table.h"
+#include "table/merging_iterator.h"
+#include "table/table_builder.h"
+#include "table/two_level_iterator.h"
+#include "test_util/sync_point.h"
+#include "util/coding.h"
+#include "util/mutexlock.h"
+#include "util/stop_watch.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+const char* GetFlushReasonString(FlushReason flush_reason) {
+ switch (flush_reason) {
+ case FlushReason::kOthers:
+ return "Other Reasons";
+ case FlushReason::kGetLiveFiles:
+ return "Get Live Files";
+ case FlushReason::kShutDown:
+ return "Shut down";
+ case FlushReason::kExternalFileIngestion:
+ return "External File Ingestion";
+ case FlushReason::kManualCompaction:
+ return "Manual Compaction";
+ case FlushReason::kWriteBufferManager:
+ return "Write Buffer Manager";
+ case FlushReason::kWriteBufferFull:
+ return "Write Buffer Full";
+ case FlushReason::kTest:
+ return "Test";
+ case FlushReason::kDeleteFiles:
+ return "Delete Files";
+ case FlushReason::kAutoCompaction:
+ return "Auto Compaction";
+ case FlushReason::kManualFlush:
+ return "Manual Flush";
+ case FlushReason::kErrorRecovery:
+ return "Error Recovery";
+ case FlushReason::kWalFull:
+ return "WAL Full";
+ default:
+ return "Invalid";
+ }
+}
+
+FlushJob::FlushJob(
+ const std::string& dbname, ColumnFamilyData* cfd,
+ const ImmutableDBOptions& db_options,
+ const MutableCFOptions& mutable_cf_options, uint64_t max_memtable_id,
+ const FileOptions& file_options, VersionSet* versions,
+ InstrumentedMutex* db_mutex, std::atomic<bool>* shutting_down,
+ std::vector<SequenceNumber> existing_snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ SnapshotChecker* snapshot_checker, JobContext* job_context,
+ LogBuffer* log_buffer, FSDirectory* db_directory,
+ FSDirectory* output_file_directory, CompressionType output_compression,
+ Statistics* stats, EventLogger* event_logger, bool measure_io_stats,
+ const bool sync_output_directory, const bool write_manifest,
+ Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
+ const SeqnoToTimeMapping& seqno_time_mapping, const std::string& db_id,
+ const std::string& db_session_id, std::string full_history_ts_low,
+ BlobFileCompletionCallback* blob_callback)
+ : dbname_(dbname),
+ db_id_(db_id),
+ db_session_id_(db_session_id),
+ cfd_(cfd),
+ db_options_(db_options),
+ mutable_cf_options_(mutable_cf_options),
+ max_memtable_id_(max_memtable_id),
+ file_options_(file_options),
+ versions_(versions),
+ db_mutex_(db_mutex),
+ shutting_down_(shutting_down),
+ existing_snapshots_(std::move(existing_snapshots)),
+ earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
+ snapshot_checker_(snapshot_checker),
+ job_context_(job_context),
+ log_buffer_(log_buffer),
+ db_directory_(db_directory),
+ output_file_directory_(output_file_directory),
+ output_compression_(output_compression),
+ stats_(stats),
+ event_logger_(event_logger),
+ measure_io_stats_(measure_io_stats),
+ sync_output_directory_(sync_output_directory),
+ write_manifest_(write_manifest),
+ edit_(nullptr),
+ base_(nullptr),
+ pick_memtable_called(false),
+ thread_pri_(thread_pri),
+ io_tracer_(io_tracer),
+ clock_(db_options_.clock),
+ full_history_ts_low_(std::move(full_history_ts_low)),
+ blob_callback_(blob_callback),
+ db_impl_seqno_time_mapping_(seqno_time_mapping) {
+ // Update the thread status to indicate flush.
+ ReportStartedFlush();
+ TEST_SYNC_POINT("FlushJob::FlushJob()");
+}
+
+FlushJob::~FlushJob() { ThreadStatusUtil::ResetThreadStatus(); }
+
+void FlushJob::ReportStartedFlush() {
+ ThreadStatusUtil::SetColumnFamily(cfd_, cfd_->ioptions()->env,
+ db_options_.enable_thread_tracking);
+ ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_FLUSH);
+ ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
+ job_context_->job_id);
+ IOSTATS_RESET(bytes_written);
+}
+
+void FlushJob::ReportFlushInputSize(const autovector<MemTable*>& mems) {
+ uint64_t input_size = 0;
+ for (auto* mem : mems) {
+ input_size += mem->ApproximateMemoryUsage();
+ }
+ ThreadStatusUtil::IncreaseThreadOperationProperty(
+ ThreadStatus::FLUSH_BYTES_MEMTABLES, input_size);
+}
+
+void FlushJob::RecordFlushIOStats() {
+ RecordTick(stats_, FLUSH_WRITE_BYTES, IOSTATS(bytes_written));
+ ThreadStatusUtil::IncreaseThreadOperationProperty(
+ ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
+ IOSTATS_RESET(bytes_written);
+}
+void FlushJob::PickMemTable() {
+ db_mutex_->AssertHeld();
+ assert(!pick_memtable_called);
+ pick_memtable_called = true;
+
+ // Maximum "NextLogNumber" of the memtables to flush.
+ // When mempurge feature is turned off, this variable is useless
+ // because the memtables are implicitly sorted by increasing order of creation
+ // time. Therefore mems_->back()->GetNextLogNumber() is already equal to
+ // max_next_log_number. However when Mempurge is on, the memtables are no
+ // longer sorted by increasing order of creation time. Therefore this variable
+ // becomes necessary because mems_->back()->GetNextLogNumber() is no longer
+ // necessarily equal to max_next_log_number.
+ uint64_t max_next_log_number = 0;
+
+ // Save the contents of the earliest memtable as a new Table
+ cfd_->imm()->PickMemtablesToFlush(max_memtable_id_, &mems_,
+ &max_next_log_number);
+ if (mems_.empty()) {
+ return;
+ }
+
+ ReportFlushInputSize(mems_);
+
+ // entries mems are (implicitly) sorted in ascending order by their created
+ // time. We will use the first memtable's `edit` to keep the meta info for
+ // this flush.
+ MemTable* m = mems_[0];
+ edit_ = m->GetEdits();
+ edit_->SetPrevLogNumber(0);
+ // SetLogNumber(log_num) indicates logs with number smaller than log_num
+ // will no longer be picked up for recovery.
+ edit_->SetLogNumber(max_next_log_number);
+ edit_->SetColumnFamily(cfd_->GetID());
+
+ // path 0 for level 0 file.
+ meta_.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
+
+ base_ = cfd_->current();
+ base_->Ref(); // it is likely that we do not need this reference
+}
+
+Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
+ bool* switched_to_mempurge) {
+ TEST_SYNC_POINT("FlushJob::Start");
+ db_mutex_->AssertHeld();
+ assert(pick_memtable_called);
+ // Mempurge threshold can be dynamically changed.
+ // For sake of consistency, mempurge_threshold is
+ // saved locally to maintain consistency in each
+ // FlushJob::Run call.
+ double mempurge_threshold =
+ mutable_cf_options_.experimental_mempurge_threshold;
+
+ AutoThreadOperationStageUpdater stage_run(ThreadStatus::STAGE_FLUSH_RUN);
+ if (mems_.empty()) {
+ ROCKS_LOG_BUFFER(log_buffer_, "[%s] Nothing in memtable to flush",
+ cfd_->GetName().c_str());
+ return Status::OK();
+ }
+
+ // I/O measurement variables
+ PerfLevel prev_perf_level = PerfLevel::kEnableTime;
+ uint64_t prev_write_nanos = 0;
+ uint64_t prev_fsync_nanos = 0;
+ uint64_t prev_range_sync_nanos = 0;
+ uint64_t prev_prepare_write_nanos = 0;
+ uint64_t prev_cpu_write_nanos = 0;
+ uint64_t prev_cpu_read_nanos = 0;
+ if (measure_io_stats_) {
+ prev_perf_level = GetPerfLevel();
+ SetPerfLevel(PerfLevel::kEnableTime);
+ prev_write_nanos = IOSTATS(write_nanos);
+ prev_fsync_nanos = IOSTATS(fsync_nanos);
+ prev_range_sync_nanos = IOSTATS(range_sync_nanos);
+ prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
+ prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
+ prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
+ }
+ Status mempurge_s = Status::NotFound("No MemPurge.");
+ if ((mempurge_threshold > 0.0) &&
+ (cfd_->GetFlushReason() == FlushReason::kWriteBufferFull) &&
+ (!mems_.empty()) && MemPurgeDecider(mempurge_threshold) &&
+ !(db_options_.atomic_flush)) {
+ cfd_->SetMempurgeUsed();
+ mempurge_s = MemPurge();
+ if (!mempurge_s.ok()) {
+ // Mempurge is typically aborted when the output
+ // bytes cannot be contained onto a single output memtable.
+ if (mempurge_s.IsAborted()) {
+ ROCKS_LOG_INFO(db_options_.info_log, "Mempurge process aborted: %s\n",
+ mempurge_s.ToString().c_str());
+ } else {
+ // However the mempurge process can also fail for
+ // other reasons (eg: new_mem->Add() fails).
+ ROCKS_LOG_WARN(db_options_.info_log, "Mempurge process failed: %s\n",
+ mempurge_s.ToString().c_str());
+ }
+ } else {
+ if (switched_to_mempurge) {
+ *switched_to_mempurge = true;
+ } else {
+ // The mempurge process was successful, but no switch_to_mempurge
+ // pointer provided so no way to propagate the state of flush job.
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Mempurge process succeeded"
+ "but no 'switched_to_mempurge' ptr provided.\n");
+ }
+ }
+ }
+ Status s;
+ if (mempurge_s.ok()) {
+ base_->Unref();
+ s = Status::OK();
+ } else {
+ // This will release and re-acquire the mutex.
+ s = WriteLevel0Table();
+ }
+
+ if (s.ok() && cfd_->IsDropped()) {
+ s = Status::ColumnFamilyDropped("Column family dropped during compaction");
+ }
+ if ((s.ok() || s.IsColumnFamilyDropped()) &&
+ shutting_down_->load(std::memory_order_acquire)) {
+ s = Status::ShutdownInProgress("Database shutdown");
+ }
+
+ if (!s.ok()) {
+ cfd_->imm()->RollbackMemtableFlush(mems_, meta_.fd.GetNumber());
+ } else if (write_manifest_) {
+ TEST_SYNC_POINT("FlushJob::InstallResults");
+ // Replace immutable memtable with the generated Table
+ s = cfd_->imm()->TryInstallMemtableFlushResults(
+ cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
+ meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
+ log_buffer_, &committed_flush_jobs_info_,
+ !(mempurge_s.ok()) /* write_edit : true if no mempurge happened (or if aborted),
+ but 'false' if mempurge successful: no new min log number
+ or new level 0 file path to write to manifest. */);
+ }
+
+ if (s.ok() && file_meta != nullptr) {
+ *file_meta = meta_;
+ }
+ RecordFlushIOStats();
+
+ // When measure_io_stats_ is true, the default 512 bytes is not enough.
+ auto stream = event_logger_->LogToBuffer(log_buffer_, 1024);
+ stream << "job" << job_context_->job_id << "event"
+ << "flush_finished";
+ stream << "output_compression"
+ << CompressionTypeToString(output_compression_);
+ stream << "lsm_state";
+ stream.StartArray();
+ auto vstorage = cfd_->current()->storage_info();
+ for (int level = 0; level < vstorage->num_levels(); ++level) {
+ stream << vstorage->NumLevelFiles(level);
+ }
+ stream.EndArray();
+
+ const auto& blob_files = vstorage->GetBlobFiles();
+ if (!blob_files.empty()) {
+ assert(blob_files.front());
+ stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
+
+ assert(blob_files.back());
+ stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
+ }
+
+ stream << "immutable_memtables" << cfd_->imm()->NumNotFlushed();
+
+ if (measure_io_stats_) {
+ if (prev_perf_level != PerfLevel::kEnableTime) {
+ SetPerfLevel(prev_perf_level);
+ }
+ stream << "file_write_nanos" << (IOSTATS(write_nanos) - prev_write_nanos);
+ stream << "file_range_sync_nanos"
+ << (IOSTATS(range_sync_nanos) - prev_range_sync_nanos);
+ stream << "file_fsync_nanos" << (IOSTATS(fsync_nanos) - prev_fsync_nanos);
+ stream << "file_prepare_write_nanos"
+ << (IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos);
+ stream << "file_cpu_write_nanos"
+ << (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos);
+ stream << "file_cpu_read_nanos"
+ << (IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos);
+ }
+
+ return s;
+}
+
+void FlushJob::Cancel() {
+ db_mutex_->AssertHeld();
+ assert(base_ != nullptr);
+ base_->Unref();
+}
+
+Status FlushJob::MemPurge() {
+ Status s;
+ db_mutex_->AssertHeld();
+ db_mutex_->Unlock();
+ assert(!mems_.empty());
+
+ // Measure purging time.
+ const uint64_t start_micros = clock_->NowMicros();
+ const uint64_t start_cpu_micros = clock_->CPUMicros();
+
+ MemTable* new_mem = nullptr;
+ // For performance/log investigation purposes:
+ // look at how much useful payload we harvest in the new_mem.
+ // This value is then printed to the DB log.
+ double new_mem_capacity = 0.0;
+
+ // Create two iterators, one for the memtable data (contains
+ // info from puts + deletes), and one for the memtable
+ // Range Tombstones (from DeleteRanges).
+ ReadOptions ro;
+ ro.total_order_seek = true;
+ Arena arena;
+ std::vector<InternalIterator*> memtables;
+ std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
+ range_del_iters;
+ for (MemTable* m : mems_) {
+ memtables.push_back(m->NewIterator(ro, &arena));
+ auto* range_del_iter = m->NewRangeTombstoneIterator(
+ ro, kMaxSequenceNumber, true /* immutable_memtable */);
+ if (range_del_iter != nullptr) {
+ range_del_iters.emplace_back(range_del_iter);
+ }
+ }
+
+ assert(!memtables.empty());
+ SequenceNumber first_seqno = kMaxSequenceNumber;
+ SequenceNumber earliest_seqno = kMaxSequenceNumber;
+ // Pick first and earliest seqno as min of all first_seqno
+ // and earliest_seqno of the mempurged memtables.
+ for (const auto& mem : mems_) {
+ first_seqno = mem->GetFirstSequenceNumber() < first_seqno
+ ? mem->GetFirstSequenceNumber()
+ : first_seqno;
+ earliest_seqno = mem->GetEarliestSequenceNumber() < earliest_seqno
+ ? mem->GetEarliestSequenceNumber()
+ : earliest_seqno;
+ }
+
+ ScopedArenaIterator iter(
+ NewMergingIterator(&(cfd_->internal_comparator()), memtables.data(),
+ static_cast<int>(memtables.size()), &arena));
+
+ auto* ioptions = cfd_->ioptions();
+
+ // Place iterator at the First (meaning most recent) key node.
+ iter->SeekToFirst();
+
+ const std::string* const full_history_ts_low = &(cfd_->GetFullHistoryTsLow());
+ std::unique_ptr<CompactionRangeDelAggregator> range_del_agg(
+ new CompactionRangeDelAggregator(&(cfd_->internal_comparator()),
+ existing_snapshots_,
+ full_history_ts_low));
+ for (auto& rd_iter : range_del_iters) {
+ range_del_agg->AddTombstones(std::move(rd_iter));
+ }
+
+ // If there is valid data in the memtable,
+ // or at least range tombstones, copy over the info
+ // to the new memtable.
+ if (iter->Valid() || !range_del_agg->IsEmpty()) {
+ // MaxSize is the size of a memtable.
+ size_t maxSize = mutable_cf_options_.write_buffer_size;
+ std::unique_ptr<CompactionFilter> compaction_filter;
+ if (ioptions->compaction_filter_factory != nullptr &&
+ ioptions->compaction_filter_factory->ShouldFilterTableFileCreation(
+ TableFileCreationReason::kFlush)) {
+ CompactionFilter::Context ctx;
+ ctx.is_full_compaction = false;
+ ctx.is_manual_compaction = false;
+ ctx.column_family_id = cfd_->GetID();
+ ctx.reason = TableFileCreationReason::kFlush;
+ compaction_filter =
+ ioptions->compaction_filter_factory->CreateCompactionFilter(ctx);
+ if (compaction_filter != nullptr &&
+ !compaction_filter->IgnoreSnapshots()) {
+ s = Status::NotSupported(
+ "CompactionFilter::IgnoreSnapshots() = false is not supported "
+ "anymore.");
+ return s;
+ }
+ }
+
+ new_mem = new MemTable((cfd_->internal_comparator()), *(cfd_->ioptions()),
+ mutable_cf_options_, cfd_->write_buffer_mgr(),
+ earliest_seqno, cfd_->GetID());
+ assert(new_mem != nullptr);
+
+ Env* env = db_options_.env;
+ assert(env);
+ MergeHelper merge(
+ env, (cfd_->internal_comparator()).user_comparator(),
+ (ioptions->merge_operator).get(), compaction_filter.get(),
+ ioptions->logger, true /* internal key corruption is not ok */,
+ existing_snapshots_.empty() ? 0 : existing_snapshots_.back(),
+ snapshot_checker_);
+ assert(job_context_);
+ SequenceNumber job_snapshot_seq = job_context_->GetJobSnapshotSequence();
+ const std::atomic<bool> kManualCompactionCanceledFalse{false};
+ CompactionIterator c_iter(
+ iter.get(), (cfd_->internal_comparator()).user_comparator(), &merge,
+ kMaxSequenceNumber, &existing_snapshots_,
+ earliest_write_conflict_snapshot_, job_snapshot_seq, snapshot_checker_,
+ env, ShouldReportDetailedTime(env, ioptions->stats),
+ true /* internal key corruption is not ok */, range_del_agg.get(),
+ nullptr, ioptions->allow_data_in_errors,
+ ioptions->enforce_single_del_contracts,
+ /*manual_compaction_canceled=*/kManualCompactionCanceledFalse,
+ /*compaction=*/nullptr, compaction_filter.get(),
+ /*shutting_down=*/nullptr, ioptions->info_log, full_history_ts_low);
+
+ // Set earliest sequence number in the new memtable
+ // to be equal to the earliest sequence number of the
+ // memtable being flushed (See later if there is a need
+ // to update this number!).
+ new_mem->SetEarliestSequenceNumber(earliest_seqno);
+ // Likewise for first seq number.
+ new_mem->SetFirstSequenceNumber(first_seqno);
+ SequenceNumber new_first_seqno = kMaxSequenceNumber;
+
+ c_iter.SeekToFirst();
+
+ // Key transfer
+ for (; c_iter.Valid(); c_iter.Next()) {
+ const ParsedInternalKey ikey = c_iter.ikey();
+ const Slice value = c_iter.value();
+ new_first_seqno =
+ ikey.sequence < new_first_seqno ? ikey.sequence : new_first_seqno;
+
+ // Should we update "OldestKeyTime" ???? -> timestamp appear
+ // to still be an "experimental" feature.
+ s = new_mem->Add(
+ ikey.sequence, ikey.type, ikey.user_key, value,
+ nullptr, // KV protection info set as nullptr since it
+ // should only be useful for the first add to
+ // the original memtable.
+ false, // : allow concurrent_memtable_writes_
+ // Not seen as necessary for now.
+ nullptr, // get_post_process_info(m) must be nullptr
+ // when concurrent_memtable_writes is switched off.
+ nullptr); // hint, only used when concurrent_memtable_writes_
+ // is switched on.
+ if (!s.ok()) {
+ break;
+ }
+
+ // If new_mem has size greater than maxSize,
+ // then rollback to regular flush operation,
+ // and destroy new_mem.
+ if (new_mem->ApproximateMemoryUsage() > maxSize) {
+ s = Status::Aborted("Mempurge filled more than one memtable.");
+ new_mem_capacity = 1.0;
+ break;
+ }
+ }
+
+ // Check status and propagate
+ // potential error status from c_iter
+ if (!s.ok()) {
+ c_iter.status().PermitUncheckedError();
+ } else if (!c_iter.status().ok()) {
+ s = c_iter.status();
+ }
+
+ // Range tombstone transfer.
+ if (s.ok()) {
+ auto range_del_it = range_del_agg->NewIterator();
+ for (range_del_it->SeekToFirst(); range_del_it->Valid();
+ range_del_it->Next()) {
+ auto tombstone = range_del_it->Tombstone();
+ new_first_seqno =
+ tombstone.seq_ < new_first_seqno ? tombstone.seq_ : new_first_seqno;
+ s = new_mem->Add(
+ tombstone.seq_, // Sequence number
+ kTypeRangeDeletion, // KV type
+ tombstone.start_key_, // Key is start key.
+ tombstone.end_key_, // Value is end key.
+ nullptr, // KV protection info set as nullptr since it
+ // should only be useful for the first add to
+ // the original memtable.
+ false, // : allow concurrent_memtable_writes_
+ // Not seen as necessary for now.
+ nullptr, // get_post_process_info(m) must be nullptr
+ // when concurrent_memtable_writes is switched off.
+ nullptr); // hint, only used when concurrent_memtable_writes_
+ // is switched on.
+
+ if (!s.ok()) {
+ break;
+ }
+
+ // If new_mem has size greater than maxSize,
+ // then rollback to regular flush operation,
+ // and destroy new_mem.
+ if (new_mem->ApproximateMemoryUsage() > maxSize) {
+ s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
+ new_mem_capacity = 1.0;
+ break;
+ }
+ }
+ }
+
+ // If everything happened smoothly and new_mem contains valid data,
+ // decide if it is flushed to storage or kept in the imm()
+ // memtable list (memory).
+ if (s.ok() && (new_first_seqno != kMaxSequenceNumber)) {
+ // Rectify the first sequence number, which (unlike the earliest seq
+ // number) needs to be present in the new memtable.
+ new_mem->SetFirstSequenceNumber(new_first_seqno);
+
+ // The new_mem is added to the list of immutable memtables
+ // only if it filled at less than 100% capacity and isn't flagged
+ // as in need of being flushed.
+ if (new_mem->ApproximateMemoryUsage() < maxSize &&
+ !(new_mem->ShouldFlushNow())) {
+ // Construct fragmented memtable range tombstones without mutex
+ new_mem->ConstructFragmentedRangeTombstones();
+ db_mutex_->Lock();
+ uint64_t new_mem_id = mems_[0]->GetID();
+
+ new_mem->SetID(new_mem_id);
+ new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber());
+
+ // This addition will not trigger another flush, because
+ // we do not call SchedulePendingFlush().
+ cfd_->imm()->Add(new_mem, &job_context_->memtables_to_free);
+ new_mem->Ref();
+#ifndef ROCKSDB_LITE
+ // Piggyback FlushJobInfo on the first flushed memtable.
+ db_mutex_->AssertHeld();
+ meta_.fd.file_size = 0;
+ mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
+#endif // !ROCKSDB_LITE
+ db_mutex_->Unlock();
+ } else {
+ s = Status::Aborted(Slice("Mempurge filled more than one memtable."));
+ new_mem_capacity = 1.0;
+ if (new_mem) {
+ job_context_->memtables_to_free.push_back(new_mem);
+ }
+ }
+ } else {
+ // In this case, the newly allocated new_mem is empty.
+ assert(new_mem != nullptr);
+ job_context_->memtables_to_free.push_back(new_mem);
+ }
+ }
+
+ // Reacquire the mutex for WriteLevel0 function.
+ db_mutex_->Lock();
+
+ // If mempurge successful, don't write input tables to level0,
+ // but write any full output table to level0.
+ if (s.ok()) {
+ TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeSuccessful");
+ } else {
+ TEST_SYNC_POINT("DBImpl::FlushJob:MemPurgeUnsuccessful");
+ }
+ const uint64_t micros = clock_->NowMicros() - start_micros;
+ const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Mempurge lasted %" PRIu64
+ " microseconds, and %" PRIu64
+ " cpu "
+ "microseconds. Status is %s ok. Perc capacity: %f\n",
+ cfd_->GetName().c_str(), job_context_->job_id, micros,
+ cpu_micros, s.ok() ? "" : "not", new_mem_capacity);
+
+ return s;
+}
+
+bool FlushJob::MemPurgeDecider(double threshold) {
+ // Never trigger mempurge if threshold is not a strictly positive value.
+ if (!(threshold > 0.0)) {
+ return false;
+ }
+ if (threshold > (1.0 * mems_.size())) {
+ return true;
+ }
+ // Payload and useful_payload (in bytes).
+ // The useful payload ratio of a given MemTable
+ // is estimated to be useful_payload/payload.
+ uint64_t payload = 0, useful_payload = 0, entry_size = 0;
+
+ // Local variables used repetitively inside the for-loop
+ // when iterating over the sampled entries.
+ Slice key_slice, value_slice;
+ ParsedInternalKey res;
+ SnapshotImpl min_snapshot;
+ std::string vget;
+ Status mget_s, parse_s;
+ MergeContext merge_context;
+ SequenceNumber max_covering_tombstone_seq = 0, sqno = 0,
+ min_seqno_snapshot = 0;
+ bool get_res, can_be_useful_payload, not_in_next_mems;
+
+ // If estimated_useful_payload is > threshold,
+ // then flush to storage, else MemPurge.
+ double estimated_useful_payload = 0.0;
+ // Cochran formula for determining sample size.
+ // 95% confidence interval, 7% precision.
+ // n0 = (1.96*1.96)*0.25/(0.07*0.07) = 196.0
+ double n0 = 196.0;
+ ReadOptions ro;
+ ro.total_order_seek = true;
+
+ // Iterate over each memtable of the set.
+ for (auto mem_iter = std::begin(mems_); mem_iter != std::end(mems_);
+ mem_iter++) {
+ MemTable* mt = *mem_iter;
+
+ // Else sample from the table.
+ uint64_t nentries = mt->num_entries();
+ // Corrected Cochran formula for small populations
+ // (converges to n0 for large populations).
+ uint64_t target_sample_size =
+ static_cast<uint64_t>(ceil(n0 / (1.0 + (n0 / nentries))));
+ std::unordered_set<const char*> sentries = {};
+ // Populate sample entries set.
+ mt->UniqueRandomSample(target_sample_size, &sentries);
+
+ // Estimate the garbage ratio by comparing if
+ // each sample corresponds to a valid entry.
+ for (const char* ss : sentries) {
+ key_slice = GetLengthPrefixedSlice(ss);
+ parse_s = ParseInternalKey(key_slice, &res, true /*log_err_key*/);
+ if (!parse_s.ok()) {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Memtable Decider: ParseInternalKey did not parse "
+ "key_slice %s successfully.",
+ key_slice.data());
+ }
+
+ // Size of the entry is "key size (+ value size if KV entry)"
+ entry_size = key_slice.size();
+ if (res.type == kTypeValue) {
+ value_slice =
+ GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
+ entry_size += value_slice.size();
+ }
+
+ // Count entry bytes as payload.
+ payload += entry_size;
+
+ LookupKey lkey(res.user_key, kMaxSequenceNumber);
+
+ // Paranoia: zero out these values just in case.
+ max_covering_tombstone_seq = 0;
+ sqno = 0;
+
+ // Pick the oldest existing snapshot that is more recent
+ // than the sequence number of the sampled entry.
+ min_seqno_snapshot = kMaxSequenceNumber;
+ for (SequenceNumber seq_num : existing_snapshots_) {
+ if (seq_num > res.sequence && seq_num < min_seqno_snapshot) {
+ min_seqno_snapshot = seq_num;
+ }
+ }
+ min_snapshot.number_ = min_seqno_snapshot;
+ ro.snapshot =
+ min_seqno_snapshot < kMaxSequenceNumber ? &min_snapshot : nullptr;
+
+ // Estimate if the sample entry is valid or not.
+ get_res = mt->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
+ &mget_s, &merge_context, &max_covering_tombstone_seq,
+ &sqno, ro, true /* immutable_memtable */);
+ if (!get_res) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Memtable Get returned false when Get(sampled entry). "
+ "Yet each sample entry should exist somewhere in the memtable, "
+ "unrelated to whether it has been deleted or not.");
+ }
+
+ // TODO(bjlemaire): evaluate typeMerge.
+ // This is where the sampled entry is estimated to be
+ // garbage or not. Note that this is a garbage *estimation*
+ // because we do not include certain items such as
+ // CompactionFitlers triggered at flush, or if the same delete
+ // has been inserted twice or more in the memtable.
+
+ // Evaluate if the entry can be useful payload
+ // Situation #1: entry is a KV entry, was found in the memtable mt
+ // and the sequence numbers match.
+ can_be_useful_payload = (res.type == kTypeValue) && get_res &&
+ mget_s.ok() && (sqno == res.sequence);
+
+ // Situation #2: entry is a delete entry, was found in the memtable mt
+ // (because gres==true) and no valid KV entry is found.
+ // (note: duplicate delete entries are also taken into
+ // account here, because the sequence number 'sqno'
+ // in memtable->Get(&sqno) operation is set to be equal
+ // to the most recent delete entry as well).
+ can_be_useful_payload |=
+ ((res.type == kTypeDeletion) || (res.type == kTypeSingleDeletion)) &&
+ mget_s.IsNotFound() && get_res && (sqno == res.sequence);
+
+ // If there is a chance that the entry is useful payload
+ // Verify that the entry does not appear in the following memtables
+ // (memtables with greater memtable ID/larger sequence numbers).
+ if (can_be_useful_payload) {
+ not_in_next_mems = true;
+ for (auto next_mem_iter = mem_iter + 1;
+ next_mem_iter != std::end(mems_); next_mem_iter++) {
+ if ((*next_mem_iter)
+ ->Get(lkey, &vget, /*columns=*/nullptr, /*timestamp=*/nullptr,
+ &mget_s, &merge_context, &max_covering_tombstone_seq,
+ &sqno, ro, true /* immutable_memtable */)) {
+ not_in_next_mems = false;
+ break;
+ }
+ }
+ if (not_in_next_mems) {
+ useful_payload += entry_size;
+ }
+ }
+ }
+ if (payload > 0) {
+ // We use the estimated useful payload ratio to
+ // evaluate how many of the memtable bytes are useful bytes.
+ estimated_useful_payload +=
+ (mt->ApproximateMemoryUsage()) * (useful_payload * 1.0 / payload);
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "Mempurge sampling [CF %s] - found garbage ratio from "
+ "sampling: %f. Threshold is %f\n",
+ cfd_->GetName().c_str(),
+ (payload - useful_payload) * 1.0 / payload, threshold);
+ } else {
+ ROCKS_LOG_WARN(db_options_.info_log,
+ "Mempurge sampling: null payload measured, and collected "
+ "sample size is %zu\n.",
+ sentries.size());
+ }
+ }
+ // We convert the total number of useful payload bytes
+ // into the proportion of memtable necessary to store all these bytes.
+ // We compare this proportion with the threshold value.
+ return ((estimated_useful_payload / mutable_cf_options_.write_buffer_size) <
+ threshold);
+}
+
+Status FlushJob::WriteLevel0Table() {
+ AutoThreadOperationStageUpdater stage_updater(
+ ThreadStatus::STAGE_FLUSH_WRITE_L0);
+ db_mutex_->AssertHeld();
+ const uint64_t start_micros = clock_->NowMicros();
+ const uint64_t start_cpu_micros = clock_->CPUMicros();
+ Status s;
+
+ SequenceNumber smallest_seqno = mems_.front()->GetEarliestSequenceNumber();
+ if (!db_impl_seqno_time_mapping_.Empty()) {
+ // make a local copy, as the seqno_time_mapping from db_impl is not thread
+ // safe, which will be used while not holding the db_mutex.
+ seqno_to_time_mapping_ = db_impl_seqno_time_mapping_.Copy(smallest_seqno);
+ }
+
+ std::vector<BlobFileAddition> blob_file_additions;
+
+ {
+ auto write_hint = cfd_->CalculateSSTWriteHint(0);
+ Env::IOPriority io_priority = GetRateLimiterPriorityForWrite();
+ db_mutex_->Unlock();
+ if (log_buffer_) {
+ log_buffer_->FlushBufferToLog();
+ }
+ // memtables and range_del_iters store internal iterators over each data
+ // memtable and its associated range deletion memtable, respectively, at
+ // corresponding indexes.
+ std::vector<InternalIterator*> memtables;
+ std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
+ range_del_iters;
+ ReadOptions ro;
+ ro.total_order_seek = true;
+ Arena arena;
+ uint64_t total_num_entries = 0, total_num_deletes = 0;
+ uint64_t total_data_size = 0;
+ size_t total_memory_usage = 0;
+ // Used for testing:
+ uint64_t mems_size = mems_.size();
+ (void)mems_size; // avoids unused variable error when
+ // TEST_SYNC_POINT_CALLBACK not used.
+ TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:num_memtables",
+ &mems_size);
+ assert(job_context_);
+ for (MemTable* m : mems_) {
+ ROCKS_LOG_INFO(
+ db_options_.info_log,
+ "[%s] [JOB %d] Flushing memtable with next log file: %" PRIu64 "\n",
+ cfd_->GetName().c_str(), job_context_->job_id, m->GetNextLogNumber());
+ memtables.push_back(m->NewIterator(ro, &arena));
+ auto* range_del_iter = m->NewRangeTombstoneIterator(
+ ro, kMaxSequenceNumber, true /* immutable_memtable */);
+ if (range_del_iter != nullptr) {
+ range_del_iters.emplace_back(range_del_iter);
+ }
+ total_num_entries += m->num_entries();
+ total_num_deletes += m->num_deletes();
+ total_data_size += m->get_data_size();
+ total_memory_usage += m->ApproximateMemoryUsage();
+ }
+
+ event_logger_->Log() << "job" << job_context_->job_id << "event"
+ << "flush_started"
+ << "num_memtables" << mems_.size() << "num_entries"
+ << total_num_entries << "num_deletes"
+ << total_num_deletes << "total_data_size"
+ << total_data_size << "memory_usage"
+ << total_memory_usage << "flush_reason"
+ << GetFlushReasonString(cfd_->GetFlushReason());
+
+ {
+ ScopedArenaIterator iter(
+ NewMergingIterator(&cfd_->internal_comparator(), memtables.data(),
+ static_cast<int>(memtables.size()), &arena));
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": started",
+ cfd_->GetName().c_str(), job_context_->job_id,
+ meta_.fd.GetNumber());
+
+ TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
+ &output_compression_);
+ int64_t _current_time = 0;
+ auto status = clock_->GetCurrentTime(&_current_time);
+ // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
+ if (!status.ok()) {
+ ROCKS_LOG_WARN(
+ db_options_.info_log,
+ "Failed to get current time to populate creation_time property. "
+ "Status: %s",
+ status.ToString().c_str());
+ }
+ const uint64_t current_time = static_cast<uint64_t>(_current_time);
+
+ uint64_t oldest_key_time = mems_.front()->ApproximateOldestKeyTime();
+
+ // It's not clear whether oldest_key_time is always available. In case
+ // it is not available, use current_time.
+ uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
+
+ TEST_SYNC_POINT_CALLBACK(
+ "FlushJob::WriteLevel0Table:oldest_ancester_time",
+ &oldest_ancester_time);
+ meta_.oldest_ancester_time = oldest_ancester_time;
+ meta_.file_creation_time = current_time;
+
+ uint64_t num_input_entries = 0;
+ uint64_t memtable_payload_bytes = 0;
+ uint64_t memtable_garbage_bytes = 0;
+ IOStatus io_s;
+
+ const std::string* const full_history_ts_low =
+ (full_history_ts_low_.empty()) ? nullptr : &full_history_ts_low_;
+ TableBuilderOptions tboptions(
+ *cfd_->ioptions(), mutable_cf_options_, cfd_->internal_comparator(),
+ cfd_->int_tbl_prop_collector_factories(), output_compression_,
+ mutable_cf_options_.compression_opts, cfd_->GetID(), cfd_->GetName(),
+ 0 /* level */, false /* is_bottommost */,
+ TableFileCreationReason::kFlush, oldest_key_time, current_time,
+ db_id_, db_session_id_, 0 /* target_file_size */,
+ meta_.fd.GetNumber());
+ const SequenceNumber job_snapshot_seq =
+ job_context_->GetJobSnapshotSequence();
+ s = BuildTable(
+ dbname_, versions_, db_options_, tboptions, file_options_,
+ cfd_->table_cache(), iter.get(), std::move(range_del_iters), &meta_,
+ &blob_file_additions, existing_snapshots_,
+ earliest_write_conflict_snapshot_, job_snapshot_seq,
+ snapshot_checker_, mutable_cf_options_.paranoid_file_checks,
+ cfd_->internal_stats(), &io_s, io_tracer_,
+ BlobFileCreationReason::kFlush, seqno_to_time_mapping_, event_logger_,
+ job_context_->job_id, io_priority, &table_properties_, write_hint,
+ full_history_ts_low, blob_callback_, &num_input_entries,
+ &memtable_payload_bytes, &memtable_garbage_bytes);
+ // TODO: Cleanup io_status in BuildTable and table builders
+ assert(!s.ok() || io_s.ok());
+ io_s.PermitUncheckedError();
+ if (num_input_entries != total_num_entries && s.ok()) {
+ std::string msg = "Expected " + std::to_string(total_num_entries) +
+ " entries in memtables, but read " +
+ std::to_string(num_input_entries);
+ ROCKS_LOG_WARN(db_options_.info_log, "[%s] [JOB %d] Level-0 flush %s",
+ cfd_->GetName().c_str(), job_context_->job_id,
+ msg.c_str());
+ if (db_options_.flush_verify_memtable_count) {
+ s = Status::Corruption(msg);
+ }
+ }
+ if (tboptions.reason == TableFileCreationReason::kFlush) {
+ TEST_SYNC_POINT("DBImpl::FlushJob:Flush");
+ RecordTick(stats_, MEMTABLE_PAYLOAD_BYTES_AT_FLUSH,
+ memtable_payload_bytes);
+ RecordTick(stats_, MEMTABLE_GARBAGE_BYTES_AT_FLUSH,
+ memtable_garbage_bytes);
+ }
+ LogFlush(db_options_.info_log);
+ }
+ ROCKS_LOG_BUFFER(log_buffer_,
+ "[%s] [JOB %d] Level-0 flush table #%" PRIu64 ": %" PRIu64
+ " bytes %s"
+ "%s",
+ cfd_->GetName().c_str(), job_context_->job_id,
+ meta_.fd.GetNumber(), meta_.fd.GetFileSize(),
+ s.ToString().c_str(),
+ meta_.marked_for_compaction ? " (needs compaction)" : "");
+
+ if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
+ s = output_file_directory_->FsyncWithDirOptions(
+ IOOptions(), nullptr,
+ DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
+ }
+ TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
+ db_mutex_->Lock();
+ }
+ base_->Unref();
+
+ // Note that if file_size is zero, the file has been deleted and
+ // should not be added to the manifest.
+ const bool has_output = meta_.fd.GetFileSize() > 0;
+
+ if (s.ok() && has_output) {
+ TEST_SYNC_POINT("DBImpl::FlushJob:SSTFileCreated");
+ // if we have more than 1 background thread, then we cannot
+ // insert files directly into higher levels because some other
+ // threads could be concurrently producing compacted files for
+ // that key range.
+ // Add file to L0
+ edit_->AddFile(0 /* level */, meta_.fd.GetNumber(), meta_.fd.GetPathId(),
+ meta_.fd.GetFileSize(), meta_.smallest, meta_.largest,
+ meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
+ meta_.marked_for_compaction, meta_.temperature,
+ meta_.oldest_blob_file_number, meta_.oldest_ancester_time,
+ meta_.file_creation_time, meta_.file_checksum,
+ meta_.file_checksum_func_name, meta_.unique_id);
+
+ edit_->SetBlobFileAdditions(std::move(blob_file_additions));
+ }
+#ifndef ROCKSDB_LITE
+ // Piggyback FlushJobInfo on the first first flushed memtable.
+ mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
+#endif // !ROCKSDB_LITE
+
+ // Note that here we treat flush as level 0 compaction in internal stats
+ InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
+ const uint64_t micros = clock_->NowMicros() - start_micros;
+ const uint64_t cpu_micros = clock_->CPUMicros() - start_cpu_micros;
+ stats.micros = micros;
+ stats.cpu_micros = cpu_micros;
+
+ ROCKS_LOG_INFO(db_options_.info_log,
+ "[%s] [JOB %d] Flush lasted %" PRIu64
+ " microseconds, and %" PRIu64 " cpu microseconds.\n",
+ cfd_->GetName().c_str(), job_context_->job_id, micros,
+ cpu_micros);
+
+ if (has_output) {
+ stats.bytes_written = meta_.fd.GetFileSize();
+ stats.num_output_files = 1;
+ }
+
+ const auto& blobs = edit_->GetBlobFileAdditions();
+ for (const auto& blob : blobs) {
+ stats.bytes_written_blob += blob.GetTotalBlobBytes();
+ }
+
+ stats.num_output_files_blob = static_cast<int>(blobs.size());
+
+ RecordTimeToHistogram(stats_, FLUSH_TIME, stats.micros);
+ cfd_->internal_stats()->AddCompactionStats(0 /* level */, thread_pri_, stats);
+ cfd_->internal_stats()->AddCFStats(
+ InternalStats::BYTES_FLUSHED,
+ stats.bytes_written + stats.bytes_written_blob);
+ RecordFlushIOStats();
+
+ return s;
+}
+
+Env::IOPriority FlushJob::GetRateLimiterPriorityForWrite() {
+ if (versions_ && versions_->GetColumnFamilySet() &&
+ versions_->GetColumnFamilySet()->write_controller()) {
+ WriteController* write_controller =
+ versions_->GetColumnFamilySet()->write_controller();
+ if (write_controller->IsStopped() || write_controller->NeedsDelay()) {
+ return Env::IO_USER;
+ }
+ }
+
+ return Env::IO_HIGH;
+}
+
+#ifndef ROCKSDB_LITE
+std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
+ db_mutex_->AssertHeld();
+ std::unique_ptr<FlushJobInfo> info(new FlushJobInfo{});
+ info->cf_id = cfd_->GetID();
+ info->cf_name = cfd_->GetName();
+
+ const uint64_t file_number = meta_.fd.GetNumber();
+ info->file_path =
+ MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, file_number);
+ info->file_number = file_number;
+ info->oldest_blob_file_number = meta_.oldest_blob_file_number;
+ info->thread_id = db_options_.env->GetThreadID();
+ info->job_id = job_context_->job_id;
+ info->smallest_seqno = meta_.fd.smallest_seqno;
+ info->largest_seqno = meta_.fd.largest_seqno;
+ info->table_properties = table_properties_;
+ info->flush_reason = cfd_->GetFlushReason();
+ info->blob_compression_type = mutable_cf_options_.blob_compression_type;
+
+ // Update BlobFilesInfo.
+ for (const auto& blob_file : edit_->GetBlobFileAdditions()) {
+ BlobFileAdditionInfo blob_file_addition_info(
+ BlobFileName(cfd_->ioptions()->cf_paths.front().path,
+ blob_file.GetBlobFileNumber()) /*blob_file_path*/,
+ blob_file.GetBlobFileNumber(), blob_file.GetTotalBlobCount(),
+ blob_file.GetTotalBlobBytes());
+ info->blob_file_addition_infos.emplace_back(
+ std::move(blob_file_addition_info));
+ }
+ return info;
+}
+#endif // !ROCKSDB_LITE
+
+} // namespace ROCKSDB_NAMESPACE