diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/db_impl/db_impl_write.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/db/db_impl/db_impl_write.cc | 2435 |
1 files changed, 2435 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_impl/db_impl_write.cc b/src/rocksdb/db/db_impl/db_impl_write.cc new file mode 100644 index 000000000..a597c168d --- /dev/null +++ b/src/rocksdb/db/db_impl/db_impl_write.cc @@ -0,0 +1,2435 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include <cinttypes> + +#include "db/db_impl/db_impl.h" +#include "db/error_handler.h" +#include "db/event_helpers.h" +#include "logging/logging.h" +#include "monitoring/perf_context_imp.h" +#include "options/options_helper.h" +#include "test_util/sync_point.h" +#include "util/cast_util.h" + +namespace ROCKSDB_NAMESPACE { +// Convenience methods +Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& val) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + return DB::Put(o, column_family, key, val); +} + +Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& val) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::Put(o, column_family, key, ts, val); +} + +Status DBImpl::PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + + return DB::PutEntity(options, column_family, key, columns); +} + +Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& val) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); + if (!cfh->cfd()->ioptions()->merge_operator) { + return Status::NotSupported("Provide a merge_operator when opening DB"); + } else { + return DB::Merge(o, column_family, key, val); + } +} + +Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& val) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::Merge(o, column_family, key, ts, val); +} + +Status DBImpl::Delete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, const Slice& key) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + return DB::Delete(write_options, column_family, key); +} + +Status DBImpl::Delete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::Delete(write_options, column_family, key, ts); +} + +Status DBImpl::SingleDelete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, + const Slice& key) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + return DB::SingleDelete(write_options, column_family, key); +} + +Status DBImpl::SingleDelete(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::SingleDelete(write_options, column_family, key, ts); +} + +Status DBImpl::DeleteRange(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key) { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + return DB::DeleteRange(write_options, column_family, begin_key, end_key); +} + +Status DBImpl::DeleteRange(const WriteOptions& write_options, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) { + const Status s = FailIfTsMismatchCf(column_family, ts, /*ts_for_read=*/false); + if (!s.ok()) { + return s; + } + return DB::DeleteRange(write_options, column_family, begin_key, end_key, ts); +} + +void DBImpl::SetRecoverableStatePreReleaseCallback( + PreReleaseCallback* callback) { + recoverable_state_pre_release_callback_.reset(callback); +} + +Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { + Status s; + if (write_options.protection_bytes_per_key > 0) { + s = WriteBatchInternal::UpdateProtectionInfo( + my_batch, write_options.protection_bytes_per_key); + } + if (s.ok()) { + s = WriteImpl(write_options, my_batch, /*callback=*/nullptr, + /*log_used=*/nullptr); + } + return s; +} + +#ifndef ROCKSDB_LITE +Status DBImpl::WriteWithCallback(const WriteOptions& write_options, + WriteBatch* my_batch, + WriteCallback* callback) { + Status s; + if (write_options.protection_bytes_per_key > 0) { + s = WriteBatchInternal::UpdateProtectionInfo( + my_batch, write_options.protection_bytes_per_key); + } + if (s.ok()) { + s = WriteImpl(write_options, my_batch, callback, nullptr); + } + return s; +} +#endif // ROCKSDB_LITE + +// The main write queue. This is the only write queue that updates LastSequence. +// When using one write queue, the same sequence also indicates the last +// published sequence. +Status DBImpl::WriteImpl(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref, + bool disable_memtable, uint64_t* seq_used, + size_t batch_cnt, + PreReleaseCallback* pre_release_callback, + PostMemTableCallback* post_memtable_callback) { + assert(!seq_per_batch_ || batch_cnt != 0); + assert(my_batch == nullptr || my_batch->Count() == 0 || + write_options.protection_bytes_per_key == 0 || + write_options.protection_bytes_per_key == + my_batch->GetProtectionBytesPerKey()); + if (my_batch == nullptr) { + return Status::InvalidArgument("Batch is nullptr!"); + } else if (!disable_memtable && + WriteBatchInternal::TimestampsUpdateNeeded(*my_batch)) { + // If writing to memtable, then we require the caller to set/update the + // timestamps for the keys in the write batch. + // Otherwise, it means we are just writing to the WAL, and we allow + // timestamps unset for the keys in the write batch. This can happen if we + // use TransactionDB with write-committed policy, and we currently do not + // support user-defined timestamp with other policies. + // In the prepare phase, a transaction can write the batch to the WAL + // without inserting to memtable. The keys in the batch do not have to be + // assigned timestamps because they will be used only during recovery if + // there is a commit marker which includes their commit timestamp. + return Status::InvalidArgument("write batch must have timestamp(s) set"); + } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && + write_options.rate_limiter_priority != Env::IO_USER) { + return Status::InvalidArgument( + "WriteOptions::rate_limiter_priority only allows " + "Env::IO_TOTAL and Env::IO_USER due to implementation constraints"); + } else if (write_options.rate_limiter_priority != Env::IO_TOTAL && + (write_options.disableWAL || manual_wal_flush_)) { + return Status::InvalidArgument( + "WriteOptions::rate_limiter_priority currently only supports " + "rate-limiting automatic WAL flush, which requires " + "`WriteOptions::disableWAL` and " + "`DBOptions::manual_wal_flush` both set to false"); + } else if (write_options.protection_bytes_per_key != 0 && + write_options.protection_bytes_per_key != 8) { + return Status::InvalidArgument( + "`WriteOptions::protection_bytes_per_key` must be zero or eight"); + } + // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock + // grabs but does not seem thread-safe. + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_ && !tracer_->IsWriteOrderPreserved()) { + // We don't have to preserve write order so can trace anywhere. It's more + // efficient to trace here than to add latency to a phase of the log/apply + // pipeline. + // TODO: maybe handle the tracing status? + tracer_->Write(my_batch).PermitUncheckedError(); + } + } + if (write_options.sync && write_options.disableWAL) { + return Status::InvalidArgument("Sync writes has to enable WAL."); + } + if (two_write_queues_ && immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with concurrent prepares"); + } + if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) { + // TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt + return Status::NotSupported( + "pipelined_writes is not compatible with seq_per_batch"); + } + if (immutable_db_options_.unordered_write && + immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "pipelined_writes is not compatible with unordered_write"); + } + if (immutable_db_options_.enable_pipelined_write && + post_memtable_callback != nullptr) { + return Status::NotSupported( + "pipelined write currently does not honor post_memtable_callback"); + } + if (seq_per_batch_ && post_memtable_callback != nullptr) { + return Status::NotSupported( + "seq_per_batch currently does not honor post_memtable_callback"); + } + // Otherwise IsLatestPersistentState optimization does not make sense + assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || + disable_memtable); + + if (write_options.low_pri) { + Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch); + if (!s.ok()) { + return s; + } + } + + if (two_write_queues_ && disable_memtable) { + AssignOrder assign_order = + seq_per_batch_ ? kDoAssignOrder : kDontAssignOrder; + // Otherwise it is WAL-only Prepare batches in WriteCommitted policy and + // they don't consume sequence. + return WriteImplWALOnly(&nonmem_write_thread_, write_options, my_batch, + callback, log_used, log_ref, seq_used, batch_cnt, + pre_release_callback, assign_order, + kDontPublishLastSeq, disable_memtable); + } + + if (immutable_db_options_.unordered_write) { + const size_t sub_batch_cnt = batch_cnt != 0 + ? batch_cnt + // every key is a sub-batch consuming a seq + : WriteBatchInternal::Count(my_batch); + uint64_t seq = 0; + // Use a write thread to i) optimize for WAL write, ii) publish last + // sequence in in increasing order, iii) call pre_release_callback serially + Status status = WriteImplWALOnly( + &write_thread_, write_options, my_batch, callback, log_used, log_ref, + &seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder, + kDoPublishLastSeq, disable_memtable); + TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL"); + if (!status.ok()) { + return status; + } + if (seq_used) { + *seq_used = seq; + } + if (!disable_memtable) { + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeUnorderedWriteMemtable"); + status = UnorderedWriteMemtable(write_options, my_batch, callback, + log_ref, seq, sub_batch_cnt); + } + return status; + } + + if (immutable_db_options_.enable_pipelined_write) { + return PipelinedWriteImpl(write_options, my_batch, callback, log_used, + log_ref, disable_memtable, seq_used); + } + + PERF_TIMER_GUARD(write_pre_and_post_process_time); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable, batch_cnt, pre_release_callback, + post_memtable_callback); + StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); + + write_thread_.JoinBatchGroup(&w); + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + // we are a non-leader in a parallel group + + if (w.ShouldWriteToMemtable()) { + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_memtable_time); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + true /*concurrent_memtable_writes*/, seq_per_batch_, w.batch_cnt, + batch_per_txn_, write_options.memtable_insert_hint_per_batch); + + PERF_TIMER_START(write_pre_and_post_process_time); + } + + if (write_thread_.CompleteParallelMemTableWriter(&w)) { + // we're responsible for exit batch group + // TODO(myabandeh): propagate status to write_group + auto last_sequence = w.write_group->last_sequence; + for (auto* tmp_w : *(w.write_group)) { + assert(tmp_w); + if (tmp_w->post_memtable_callback) { + Status tmp_s = + (*tmp_w->post_memtable_callback)(last_sequence, disable_memtable); + // TODO: propagate the execution status of post_memtable_callback to + // caller. + assert(tmp_s.ok()); + } + } + versions_->SetLastSequence(last_sequence); + MemTableInsertStatusCheck(w.status); + write_thread_.ExitAsBatchGroupFollower(&w); + } + assert(w.state == WriteThread::STATE_COMPLETED); + // STATE_COMPLETED conditional below handles exit + } + if (w.state == WriteThread::STATE_COMPLETED) { + if (log_used != nullptr) { + *log_used = w.log_used; + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + // write is complete and leader has updated sequence + return w.FinalStatus(); + } + // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); + Status status; + // Once reaches this point, the current writer "w" will try to do its write + // job. It may also pick up some of the remaining writers in the "writers_" + // when it finds suitable, and finish them in the same write batch. + // This is how a write job could be done by the other writer. + WriteContext write_context; + LogContext log_context(write_options.sync); + WriteThread::WriteGroup write_group; + bool in_parallel_group = false; + uint64_t last_sequence = kMaxSequenceNumber; + + assert(!two_write_queues_ || !disable_memtable); + { + // With concurrent writes we do preprocess only in the write thread that + // also does write to memtable to avoid sync issue on shared data structure + // with the other thread + + // PreprocessWrite does its own perf timing. + PERF_TIMER_STOP(write_pre_and_post_process_time); + + status = PreprocessWrite(write_options, &log_context, &write_context); + if (!two_write_queues_) { + // Assign it after ::PreprocessWrite since the sequence might advance + // inside it by WriteRecoverableState + last_sequence = versions_->LastSequence(); + } + + PERF_TIMER_START(write_pre_and_post_process_time); + } + + // Add to log and apply to memtable. We can release the lock + // during this phase since &w is currently responsible for logging + // and protects against concurrent loggers and concurrent writes + // into memtables + + TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters"); + last_batch_group_size_ = + write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + + IOStatus io_s; + Status pre_release_cb_status; + if (status.ok()) { + // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock + // grabs but does not seem thread-safe. + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_ && tracer_->IsWriteOrderPreserved()) { + for (auto* writer : write_group) { + // TODO: maybe handle the tracing status? + tracer_->Write(writer->batch).PermitUncheckedError(); + } + } + } + // Rules for when we can update the memtable concurrently + // 1. supported by memtable + // 2. Puts are not okay if inplace_update_support + // 3. Merges are not okay + // + // Rules 1..2 are enforced by checking the options + // during startup (CheckConcurrentWritesSupported), so if + // options.allow_concurrent_memtable_write is true then they can be + // assumed to be true. Rule 3 is checked for each batch. We could + // relax rules 2 if we could prevent write batches from referring + // more than once to a particular key. + bool parallel = immutable_db_options_.allow_concurrent_memtable_write && + write_group.size > 1; + size_t total_count = 0; + size_t valid_batches = 0; + size_t total_byte_size = 0; + size_t pre_release_callback_cnt = 0; + for (auto* writer : write_group) { + assert(writer); + if (writer->CheckCallback(this)) { + valid_batches += writer->batch_cnt; + if (writer->ShouldWriteToMemtable()) { + total_count += WriteBatchInternal::Count(writer->batch); + parallel = parallel && !writer->batch->HasMerge(); + } + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + if (writer->pre_release_callback) { + pre_release_callback_cnt++; + } + } + } + // Note about seq_per_batch_: either disableWAL is set for the entire write + // group or not. In either case we inc seq for each write batch with no + // failed callback. This means that there could be a batch with + // disalbe_memtable in between; although we do not write this batch to + // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc + // the seq per valid written key to mem. + size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; + + const bool concurrent_update = two_write_queues_; + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count, + concurrent_update); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size, + concurrent_update); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1, + concurrent_update); + RecordTick(stats_, WRITE_DONE_BY_SELF); + auto write_done_by_other = write_group.size - 1; + if (write_done_by_other > 0) { + stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, + write_done_by_other, concurrent_update); + RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); + } + RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + + if (write_options.disableWAL) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + if (!two_write_queues_) { + if (status.ok() && !write_options.disableWAL) { + assert(log_context.log_file_number_size); + LogFileNumberSize& log_file_number_size = + *(log_context.log_file_number_size); + PERF_TIMER_GUARD(write_wal_time); + io_s = + WriteToWAL(write_group, log_context.writer, log_used, + log_context.need_log_sync, log_context.need_log_dir_sync, + last_sequence + 1, log_file_number_size); + } + } else { + if (status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + // LastAllocatedSequence is increased inside WriteToWAL under + // wal_write_mutex_ to ensure ordered events in WAL + io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, + seq_inc); + } else { + // Otherwise we inc seq number for memtable writes + last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); + } + } + status = io_s; + assert(last_sequence != kMaxSequenceNumber); + const SequenceNumber current_sequence = last_sequence + 1; + last_sequence += seq_inc; + + // PreReleaseCallback is called after WAL write and before memtable write + if (status.ok()) { + SequenceNumber next_sequence = current_sequence; + size_t index = 0; + // Note: the logic for advancing seq here must be consistent with the + // logic in WriteBatchInternal::InsertInto(write_group...) as well as + // with WriteBatchInternal::InsertInto(write_batch...) that is called on + // the merged batch during recovery from the WAL. + for (auto* writer : write_group) { + if (writer->CallbackFailed()) { + continue; + } + writer->sequence = next_sequence; + if (writer->pre_release_callback) { + Status ws = writer->pre_release_callback->Callback( + writer->sequence, disable_memtable, writer->log_used, index++, + pre_release_callback_cnt); + if (!ws.ok()) { + status = pre_release_cb_status = ws; + break; + } + } + if (seq_per_batch_) { + assert(writer->batch_cnt); + next_sequence += writer->batch_cnt; + } else if (writer->ShouldWriteToMemtable()) { + next_sequence += WriteBatchInternal::Count(writer->batch); + } + } + } + + if (status.ok()) { + PERF_TIMER_GUARD(write_memtable_time); + + if (!parallel) { + // w.sequence will be set inside InsertInto + w.status = WriteBatchInternal::InsertInto( + write_group, current_sequence, column_family_memtables_.get(), + &flush_scheduler_, &trim_history_scheduler_, + write_options.ignore_missing_column_families, + 0 /*recovery_log_number*/, this, parallel, seq_per_batch_, + batch_per_txn_); + } else { + write_group.last_sequence = last_sequence; + write_thread_.LaunchParallelMemTableWriters(&write_group); + in_parallel_group = true; + + // Each parallel follower is doing each own writes. The leader should + // also do its own. + if (w.ShouldWriteToMemtable()) { + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + assert(w.sequence == current_sequence); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, + this, true /*concurrent_memtable_writes*/, seq_per_batch_, + w.batch_cnt, batch_per_txn_, + write_options.memtable_insert_hint_per_batch); + } + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + } + } + PERF_TIMER_START(write_pre_and_post_process_time); + + if (!io_s.ok()) { + // Check WriteToWAL status + IOStatusCheck(io_s); + } + if (!w.CallbackFailed()) { + if (!io_s.ok()) { + assert(pre_release_cb_status.ok()); + } else { + WriteStatusCheck(pre_release_cb_status); + } + } else { + assert(pre_release_cb_status.ok()); + } + + if (log_context.need_log_sync) { + VersionEdit synced_wals; + log_write_mutex_.Lock(); + if (status.ok()) { + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); + } else { + MarkLogsNotSynced(logfile_number_); + } + log_write_mutex_.Unlock(); + if (status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + status = ApplyWALToManifest(&synced_wals); + } + + // Requesting sync with two_write_queues_ is expected to be very rare. We + // hence provide a simple implementation that is not necessarily efficient. + if (two_write_queues_) { + if (manual_wal_flush_) { + status = FlushWAL(true); + } else { + status = SyncWAL(); + } + } + } + + bool should_exit_batch_group = true; + if (in_parallel_group) { + // CompleteParallelWorker returns true if this thread should + // handle exit, false means somebody else did + should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); + } + if (should_exit_batch_group) { + if (status.ok()) { + for (auto* tmp_w : write_group) { + assert(tmp_w); + if (tmp_w->post_memtable_callback) { + Status tmp_s = + (*tmp_w->post_memtable_callback)(last_sequence, disable_memtable); + // TODO: propagate the execution status of post_memtable_callback to + // caller. + assert(tmp_s.ok()); + } + } + // Note: if we are to resume after non-OK statuses we need to revisit how + // we reacts to non-OK statuses here. + versions_->SetLastSequence(last_sequence); + } + MemTableInsertStatusCheck(w.status); + write_thread_.ExitAsBatchGroupLeader(write_group, status); + } + + if (status.ok()) { + status = w.FinalStatus(); + } + return status; +} + +Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, + uint64_t* log_used, uint64_t log_ref, + bool disable_memtable, uint64_t* seq_used) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); + + WriteContext write_context; + + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable, /*_batch_cnt=*/0, + /*_pre_release_callback=*/nullptr); + write_thread_.JoinBatchGroup(&w); + TEST_SYNC_POINT("DBImplWrite::PipelinedWriteImpl:AfterJoinBatchGroup"); + if (w.state == WriteThread::STATE_GROUP_LEADER) { + WriteThread::WriteGroup wal_write_group; + if (w.callback && !w.callback->AllowWriteBatching()) { + write_thread_.WaitForMemTableWriters(); + } + LogContext log_context(!write_options.disableWAL && write_options.sync); + // PreprocessWrite does its own perf timing. + PERF_TIMER_STOP(write_pre_and_post_process_time); + w.status = PreprocessWrite(write_options, &log_context, &write_context); + PERF_TIMER_START(write_pre_and_post_process_time); + + // This can set non-OK status if callback fail. + last_batch_group_size_ = + write_thread_.EnterAsBatchGroupLeader(&w, &wal_write_group); + const SequenceNumber current_sequence = + write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1; + size_t total_count = 0; + size_t total_byte_size = 0; + + if (w.status.ok()) { + // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock + // grabs but does not seem thread-safe. + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) { + for (auto* writer : wal_write_group) { + // TODO: maybe handle the tracing status? + tracer_->Write(writer->batch).PermitUncheckedError(); + } + } + } + SequenceNumber next_sequence = current_sequence; + for (auto* writer : wal_write_group) { + assert(writer); + if (writer->CheckCallback(this)) { + if (writer->ShouldWriteToMemtable()) { + writer->sequence = next_sequence; + size_t count = WriteBatchInternal::Count(writer->batch); + next_sequence += count; + total_count += count; + } + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + } + } + if (w.disable_wal) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + write_thread_.UpdateLastSequence(current_sequence + total_count - 1); + } + + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + IOStatus io_s; + io_s.PermitUncheckedError(); // Allow io_s to be uninitialized + + if (w.status.ok() && !write_options.disableWAL) { + PERF_TIMER_GUARD(write_wal_time); + stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1); + RecordTick(stats_, WRITE_DONE_BY_SELF, 1); + if (wal_write_group.size > 1) { + stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, + wal_write_group.size - 1); + RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); + } + assert(log_context.log_file_number_size); + LogFileNumberSize& log_file_number_size = + *(log_context.log_file_number_size); + io_s = + WriteToWAL(wal_write_group, log_context.writer, log_used, + log_context.need_log_sync, log_context.need_log_dir_sync, + current_sequence, log_file_number_size); + w.status = io_s; + } + + if (!io_s.ok()) { + // Check WriteToWAL status + IOStatusCheck(io_s); + } else if (!w.CallbackFailed()) { + WriteStatusCheck(w.status); + } + + VersionEdit synced_wals; + if (log_context.need_log_sync) { + InstrumentedMutexLock l(&log_write_mutex_); + if (w.status.ok()) { + MarkLogsSynced(logfile_number_, log_context.need_log_dir_sync, + &synced_wals); + } else { + MarkLogsNotSynced(logfile_number_); + } + } + if (w.status.ok() && synced_wals.IsWalAddition()) { + InstrumentedMutexLock l(&mutex_); + w.status = ApplyWALToManifest(&synced_wals); + } + write_thread_.ExitAsBatchGroupLeader(wal_write_group, w.status); + } + + // NOTE: the memtable_write_group is declared before the following + // `if` statement because its lifetime needs to be longer + // that the inner context of the `if` as a reference to it + // may be used further below within the outer _write_thread + WriteThread::WriteGroup memtable_write_group; + + if (w.state == WriteThread::STATE_MEMTABLE_WRITER_LEADER) { + PERF_TIMER_GUARD(write_memtable_time); + assert(w.ShouldWriteToMemtable()); + write_thread_.EnterAsMemTableWriter(&w, &memtable_write_group); + if (memtable_write_group.size > 1 && + immutable_db_options_.allow_concurrent_memtable_write) { + write_thread_.LaunchParallelMemTableWriters(&memtable_write_group); + } else { + memtable_write_group.status = WriteBatchInternal::InsertInto( + memtable_write_group, w.sequence, column_family_memtables_.get(), + &flush_scheduler_, &trim_history_scheduler_, + write_options.ignore_missing_column_families, 0 /*log_number*/, this, + false /*concurrent_memtable_writes*/, seq_per_batch_, batch_per_txn_); + versions_->SetLastSequence(memtable_write_group.last_sequence); + write_thread_.ExitAsMemTableWriter(&w, memtable_write_group); + } + } else { + // NOTE: the memtable_write_group is never really used, + // so we need to set its status to pass ASSERT_STATUS_CHECKED + memtable_write_group.status.PermitUncheckedError(); + } + + if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) { + assert(w.ShouldWriteToMemtable()); + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, + false /*seq_per_batch*/, 0 /*batch_cnt*/, true /*batch_per_txn*/, + write_options.memtable_insert_hint_per_batch); + if (write_thread_.CompleteParallelMemTableWriter(&w)) { + MemTableInsertStatusCheck(w.status); + versions_->SetLastSequence(w.write_group->last_sequence); + write_thread_.ExitAsMemTableWriter(&w, *w.write_group); + } + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + + assert(w.state == WriteThread::STATE_COMPLETED); + return w.FinalStatus(); +} + +Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options, + WriteBatch* my_batch, + WriteCallback* callback, uint64_t log_ref, + SequenceNumber seq, + const size_t sub_batch_cnt) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); + + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + false /*disable_memtable*/); + + if (w.CheckCallback(this) && w.ShouldWriteToMemtable()) { + w.sequence = seq; + size_t total_count = WriteBatchInternal::Count(my_batch); + InternalStats* stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count); + RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count); + + ColumnFamilyMemTablesImpl column_family_memtables( + versions_->GetColumnFamilySet()); + w.status = WriteBatchInternal::InsertInto( + &w, w.sequence, &column_family_memtables, &flush_scheduler_, + &trim_history_scheduler_, write_options.ignore_missing_column_families, + 0 /*log_number*/, this, true /*concurrent_memtable_writes*/, + seq_per_batch_, sub_batch_cnt, true /*batch_per_txn*/, + write_options.memtable_insert_hint_per_batch); + if (write_options.disableWAL) { + has_unpersisted_data_.store(true, std::memory_order_relaxed); + } + } + + size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1; + if (pending_cnt == 0) { + // switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex + // before notify ensures that cv is in waiting state when it is notified + // thus not missing the update to pending_memtable_writes_ even though it is + // not modified under the mutex. + std::lock_guard<std::mutex> lck(switch_mutex_); + switch_cv_.notify_all(); + } + WriteStatusCheck(w.status); + + if (!w.FinalStatus().ok()) { + return w.FinalStatus(); + } + return Status::OK(); +} + +// The 2nd write queue. If enabled it will be used only for WAL-only writes. +// This is the only queue that updates LastPublishedSequence which is only +// applicable in a two-queue setting. +Status DBImpl::WriteImplWALOnly( + WriteThread* write_thread, const WriteOptions& write_options, + WriteBatch* my_batch, WriteCallback* callback, uint64_t* log_used, + const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt, + PreReleaseCallback* pre_release_callback, const AssignOrder assign_order, + const PublishLastSeq publish_last_seq, const bool disable_memtable) { + PERF_TIMER_GUARD(write_pre_and_post_process_time); + WriteThread::Writer w(write_options, my_batch, callback, log_ref, + disable_memtable, sub_batch_cnt, pre_release_callback); + StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); + + write_thread->JoinBatchGroup(&w); + assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER); + if (w.state == WriteThread::STATE_COMPLETED) { + if (log_used != nullptr) { + *log_used = w.log_used; + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + return w.FinalStatus(); + } + // else we are the leader of the write batch group + assert(w.state == WriteThread::STATE_GROUP_LEADER); + + if (publish_last_seq == kDoPublishLastSeq) { + Status status; + + // Currently we only use kDoPublishLastSeq in unordered_write + assert(immutable_db_options_.unordered_write); + WriteContext write_context; + if (error_handler_.IsDBStopped()) { + status = error_handler_.GetBGError(); + } + // TODO(myabandeh): Make preliminary checks thread-safe so we could do them + // without paying the cost of obtaining the mutex. + if (status.ok()) { + LogContext log_context; + status = PreprocessWrite(write_options, &log_context, &write_context); + WriteStatusCheckOnLocked(status); + } + if (!status.ok()) { + WriteThread::WriteGroup write_group; + write_thread->EnterAsBatchGroupLeader(&w, &write_group); + write_thread->ExitAsBatchGroupLeader(write_group, status); + return status; + } + } + + WriteThread::WriteGroup write_group; + uint64_t last_sequence; + write_thread->EnterAsBatchGroupLeader(&w, &write_group); + // Note: no need to update last_batch_group_size_ here since the batch writes + // to WAL only + // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock + // grabs but does not seem thread-safe. + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_ != nullptr && tracer_->IsWriteOrderPreserved()) { + for (auto* writer : write_group) { + // TODO: maybe handle the tracing status? + tracer_->Write(writer->batch).PermitUncheckedError(); + } + } + } + + size_t pre_release_callback_cnt = 0; + size_t total_byte_size = 0; + for (auto* writer : write_group) { + assert(writer); + if (writer->CheckCallback(this)) { + total_byte_size = WriteBatchInternal::AppendedByteSize( + total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); + if (writer->pre_release_callback) { + pre_release_callback_cnt++; + } + } + } + + const bool concurrent_update = true; + // Update stats while we are an exclusive group leader, so we know + // that nobody else can be writing to these particular stats. + // We're optimistic, updating the stats before we successfully + // commit. That lets us release our leader status early. + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size, + concurrent_update); + RecordTick(stats_, BYTES_WRITTEN, total_byte_size); + stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1, + concurrent_update); + RecordTick(stats_, WRITE_DONE_BY_SELF); + auto write_done_by_other = write_group.size - 1; + if (write_done_by_other > 0) { + stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther, + write_done_by_other, concurrent_update); + RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); + } + RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size); + + PERF_TIMER_STOP(write_pre_and_post_process_time); + + PERF_TIMER_GUARD(write_wal_time); + // LastAllocatedSequence is increased inside WriteToWAL under + // wal_write_mutex_ to ensure ordered events in WAL + size_t seq_inc = 0 /* total_count */; + if (assign_order == kDoAssignOrder) { + size_t total_batch_cnt = 0; + for (auto* writer : write_group) { + assert(writer->batch_cnt || !seq_per_batch_); + if (!writer->CallbackFailed()) { + total_batch_cnt += writer->batch_cnt; + } + } + seq_inc = total_batch_cnt; + } + Status status; + if (!write_options.disableWAL) { + IOStatus io_s = + ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc); + status = io_s; + // last_sequence may not be set if there is an error + // This error checking and return is moved up to avoid using uninitialized + // last_sequence. + if (!io_s.ok()) { + IOStatusCheck(io_s); + write_thread->ExitAsBatchGroupLeader(write_group, status); + return status; + } + } else { + // Otherwise we inc seq number to do solely the seq allocation + last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); + } + + size_t memtable_write_cnt = 0; + auto curr_seq = last_sequence + 1; + for (auto* writer : write_group) { + if (writer->CallbackFailed()) { + continue; + } + writer->sequence = curr_seq; + if (assign_order == kDoAssignOrder) { + assert(writer->batch_cnt || !seq_per_batch_); + curr_seq += writer->batch_cnt; + } + if (!writer->disable_memtable) { + memtable_write_cnt++; + } + // else seq advances only by memtable writes + } + if (status.ok() && write_options.sync) { + assert(!write_options.disableWAL); + // Requesting sync with two_write_queues_ is expected to be very rare. We + // hance provide a simple implementation that is not necessarily efficient. + if (manual_wal_flush_) { + status = FlushWAL(true); + } else { + status = SyncWAL(); + } + } + PERF_TIMER_START(write_pre_and_post_process_time); + + if (!w.CallbackFailed()) { + WriteStatusCheck(status); + } + if (status.ok()) { + size_t index = 0; + for (auto* writer : write_group) { + if (!writer->CallbackFailed() && writer->pre_release_callback) { + assert(writer->sequence != kMaxSequenceNumber); + Status ws = writer->pre_release_callback->Callback( + writer->sequence, disable_memtable, writer->log_used, index++, + pre_release_callback_cnt); + if (!ws.ok()) { + status = ws; + break; + } + } + } + } + if (publish_last_seq == kDoPublishLastSeq) { + versions_->SetLastSequence(last_sequence + seq_inc); + // Currently we only use kDoPublishLastSeq in unordered_write + assert(immutable_db_options_.unordered_write); + } + if (immutable_db_options_.unordered_write && status.ok()) { + pending_memtable_writes_ += memtable_write_cnt; + } + write_thread->ExitAsBatchGroupLeader(write_group, status); + if (status.ok()) { + status = w.FinalStatus(); + } + if (seq_used != nullptr) { + *seq_used = w.sequence; + } + return status; +} + +void DBImpl::WriteStatusCheckOnLocked(const Status& status) { + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + InstrumentedMutexLock l(&mutex_); + assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); + if (immutable_db_options_.paranoid_checks && !status.ok() && + !status.IsBusy() && !status.IsIncomplete()) { + // Maybe change the return status to void? + error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); + } +} + +void DBImpl::WriteStatusCheck(const Status& status) { + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + assert(!status.IsIOFenced() || !error_handler_.GetBGError().ok()); + if (immutable_db_options_.paranoid_checks && !status.ok() && + !status.IsBusy() && !status.IsIncomplete()) { + mutex_.Lock(); + // Maybe change the return status to void? + error_handler_.SetBGError(status, BackgroundErrorReason::kWriteCallback); + mutex_.Unlock(); + } +} + +void DBImpl::IOStatusCheck(const IOStatus& io_status) { + // Is setting bg_error_ enough here? This will at least stop + // compaction and fail any further writes. + if ((immutable_db_options_.paranoid_checks && !io_status.ok() && + !io_status.IsBusy() && !io_status.IsIncomplete()) || + io_status.IsIOFenced()) { + mutex_.Lock(); + // Maybe change the return status to void? + error_handler_.SetBGError(io_status, BackgroundErrorReason::kWriteCallback); + mutex_.Unlock(); + } else { + // Force writable file to be continue writable. + logs_.back().writer->file()->reset_seen_error(); + } +} + +void DBImpl::MemTableInsertStatusCheck(const Status& status) { + // A non-OK status here indicates that the state implied by the + // WAL has diverged from the in-memory state. This could be + // because of a corrupt write_batch (very bad), or because the + // client specified an invalid column family and didn't specify + // ignore_missing_column_families. + if (!status.ok()) { + mutex_.Lock(); + assert(!error_handler_.IsBGWorkStopped()); + // Maybe change the return status to void? + error_handler_.SetBGError(status, BackgroundErrorReason::kMemTable) + .PermitUncheckedError(); + mutex_.Unlock(); + } +} + +Status DBImpl::PreprocessWrite(const WriteOptions& write_options, + LogContext* log_context, + WriteContext* write_context) { + assert(write_context != nullptr && log_context != nullptr); + Status status; + + if (error_handler_.IsDBStopped()) { + InstrumentedMutexLock l(&mutex_); + status = error_handler_.GetBGError(); + } + + PERF_TIMER_GUARD(write_scheduling_flushes_compactions_time); + + if (UNLIKELY(status.ok() && total_log_size_ > GetMaxTotalWalSize())) { + assert(versions_); + InstrumentedMutexLock l(&mutex_); + const ColumnFamilySet* const column_families = + versions_->GetColumnFamilySet(); + assert(column_families); + size_t num_cfs = column_families->NumberOfColumnFamilies(); + assert(num_cfs >= 1); + if (num_cfs > 1) { + WaitForPendingWrites(); + status = SwitchWAL(write_context); + } + } + + if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldFlush())) { + // Before a new memtable is added in SwitchMemtable(), + // write_buffer_manager_->ShouldFlush() will keep returning true. If another + // thread is writing to another DB with the same write buffer, they may also + // be flushed. We may end up with flushing much more DBs than needed. It's + // suboptimal but still correct. + InstrumentedMutexLock l(&mutex_); + WaitForPendingWrites(); + status = HandleWriteBufferManagerFlush(write_context); + } + + if (UNLIKELY(status.ok() && !trim_history_scheduler_.Empty())) { + InstrumentedMutexLock l(&mutex_); + status = TrimMemtableHistory(write_context); + } + + if (UNLIKELY(status.ok() && !flush_scheduler_.Empty())) { + InstrumentedMutexLock l(&mutex_); + WaitForPendingWrites(); + status = ScheduleFlushes(write_context); + } + + PERF_TIMER_STOP(write_scheduling_flushes_compactions_time); + PERF_TIMER_GUARD(write_pre_and_post_process_time); + + if (UNLIKELY(status.ok() && (write_controller_.IsStopped() || + write_controller_.NeedsDelay()))) { + PERF_TIMER_STOP(write_pre_and_post_process_time); + PERF_TIMER_GUARD(write_delay_time); + // We don't know size of curent batch so that we always use the size + // for previous one. It might create a fairness issue that expiration + // might happen for smaller writes but larger writes can go through. + // Can optimize it if it is an issue. + InstrumentedMutexLock l(&mutex_); + status = DelayWrite(last_batch_group_size_, write_options); + PERF_TIMER_START(write_pre_and_post_process_time); + } + + // If memory usage exceeded beyond a certain threshold, + // write_buffer_manager_->ShouldStall() returns true to all threads writing to + // all DBs and writers will be stalled. + // It does soft checking because WriteBufferManager::buffer_limit_ has already + // exceeded at this point so no new write (including current one) will go + // through until memory usage is decreased. + if (UNLIKELY(status.ok() && write_buffer_manager_->ShouldStall())) { + if (write_options.no_slowdown) { + status = Status::Incomplete("Write stall"); + } else { + InstrumentedMutexLock l(&mutex_); + WriteBufferManagerStallWrites(); + } + } + InstrumentedMutexLock l(&log_write_mutex_); + if (status.ok() && log_context->need_log_sync) { + // Wait until the parallel syncs are finished. Any sync process has to sync + // the front log too so it is enough to check the status of front() + // We do a while loop since log_sync_cv_ is signalled when any sync is + // finished + // Note: there does not seem to be a reason to wait for parallel sync at + // this early step but it is not important since parallel sync (SyncWAL) and + // need_log_sync are usually not used together. + while (logs_.front().IsSyncing()) { + log_sync_cv_.Wait(); + } + for (auto& log : logs_) { + // This is just to prevent the logs to be synced by a parallel SyncWAL + // call. We will do the actual syncing later after we will write to the + // WAL. + // Note: there does not seem to be a reason to set this early before we + // actually write to the WAL + log.PrepareForSync(); + } + } else { + log_context->need_log_sync = false; + } + log_context->writer = logs_.back().writer; + log_context->need_log_dir_sync = + log_context->need_log_dir_sync && !log_dir_synced_; + log_context->log_file_number_size = std::addressof(alive_log_files_.back()); + + return status; +} + +Status DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group, + WriteBatch* tmp_batch, WriteBatch** merged_batch, + size_t* write_with_wal, + WriteBatch** to_be_cached_state) { + assert(write_with_wal != nullptr); + assert(tmp_batch != nullptr); + assert(*to_be_cached_state == nullptr); + *write_with_wal = 0; + auto* leader = write_group.leader; + assert(!leader->disable_wal); // Same holds for all in the batch group + if (write_group.size == 1 && !leader->CallbackFailed() && + leader->batch->GetWalTerminationPoint().is_cleared()) { + // we simply write the first WriteBatch to WAL if the group only + // contains one batch, that batch should be written to the WAL, + // and the batch is not wanting to be truncated + *merged_batch = leader->batch; + if (WriteBatchInternal::IsLatestPersistentState(*merged_batch)) { + *to_be_cached_state = *merged_batch; + } + *write_with_wal = 1; + } else { + // WAL needs all of the batches flattened into a single batch. + // We could avoid copying here with an iov-like AddRecord + // interface + *merged_batch = tmp_batch; + for (auto writer : write_group) { + if (!writer->CallbackFailed()) { + Status s = WriteBatchInternal::Append(*merged_batch, writer->batch, + /*WAL_only*/ true); + if (!s.ok()) { + tmp_batch->Clear(); + return s; + } + if (WriteBatchInternal::IsLatestPersistentState(writer->batch)) { + // We only need to cache the last of such write batch + *to_be_cached_state = writer->batch; + } + (*write_with_wal)++; + } + } + } + // return merged_batch; + return Status::OK(); +} + +// When two_write_queues_ is disabled, this function is called from the only +// write thread. Otherwise this must be called holding log_write_mutex_. +IOStatus DBImpl::WriteToWAL(const WriteBatch& merged_batch, + log::Writer* log_writer, uint64_t* log_used, + uint64_t* log_size, + Env::IOPriority rate_limiter_priority, + LogFileNumberSize& log_file_number_size) { + assert(log_size != nullptr); + + Slice log_entry = WriteBatchInternal::Contents(&merged_batch); + TEST_SYNC_POINT_CALLBACK("DBImpl::WriteToWAL:log_entry", &log_entry); + auto s = merged_batch.VerifyChecksum(); + if (!s.ok()) { + return status_to_io_status(std::move(s)); + } + *log_size = log_entry.size(); + // When two_write_queues_ WriteToWAL has to be protected from concurretn calls + // from the two queues anyway and log_write_mutex_ is already held. Otherwise + // if manual_wal_flush_ is enabled we need to protect log_writer->AddRecord + // from possible concurrent calls via the FlushWAL by the application. + const bool needs_locking = manual_wal_flush_ && !two_write_queues_; + // Due to performance cocerns of missed branch prediction penalize the new + // manual_wal_flush_ feature (by UNLIKELY) instead of the more common case + // when we do not need any locking. + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Lock(); + } + IOStatus io_s = log_writer->AddRecord(log_entry, rate_limiter_priority); + + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Unlock(); + } + if (log_used != nullptr) { + *log_used = logfile_number_; + } + total_log_size_ += log_entry.size(); + log_file_number_size.AddSize(*log_size); + log_empty_ = false; + return io_s; +} + +IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, + log::Writer* log_writer, uint64_t* log_used, + bool need_log_sync, bool need_log_dir_sync, + SequenceNumber sequence, + LogFileNumberSize& log_file_number_size) { + IOStatus io_s; + assert(!two_write_queues_); + assert(!write_group.leader->disable_wal); + // Same holds for all in the batch group + size_t write_with_wal = 0; + WriteBatch* to_be_cached_state = nullptr; + WriteBatch* merged_batch; + io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch_, &merged_batch, + &write_with_wal, &to_be_cached_state)); + if (UNLIKELY(!io_s.ok())) { + return io_s; + } + + if (merged_batch == write_group.leader->batch) { + write_group.leader->log_used = logfile_number_; + } else if (write_with_wal > 1) { + for (auto writer : write_group) { + writer->log_used = logfile_number_; + } + } + + WriteBatchInternal::SetSequence(merged_batch, sequence); + + uint64_t log_size; + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + write_group.leader->rate_limiter_priority, + log_file_number_size); + if (to_be_cached_state) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } + + if (io_s.ok() && need_log_sync) { + StopWatch sw(immutable_db_options_.clock, stats_, WAL_FILE_SYNC_MICROS); + // It's safe to access logs_ with unlocked mutex_ here because: + // - we've set getting_synced=true for all logs, + // so other threads won't pop from logs_ while we're here, + // - only writer thread can push to logs_, and we're in + // writer thread, so no one will push to logs_, + // - as long as other threads don't modify it, it's safe to read + // from std::deque from multiple threads concurrently. + // + // Sync operation should work with locked log_write_mutex_, because: + // when DBOptions.manual_wal_flush_ is set, + // FlushWAL function will be invoked by another thread. + // if without locked log_write_mutex_, the log file may get data + // corruption + + const bool needs_locking = manual_wal_flush_ && !two_write_queues_; + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Lock(); + } + + for (auto& log : logs_) { + io_s = log.writer->file()->Sync(immutable_db_options_.use_fsync); + if (!io_s.ok()) { + break; + } + } + + if (UNLIKELY(needs_locking)) { + log_write_mutex_.Unlock(); + } + + if (io_s.ok() && need_log_dir_sync) { + // We only sync WAL directory the first time WAL syncing is + // requested, so that in case users never turn on WAL sync, + // we can avoid the disk I/O in the write code path. + io_s = directories_.GetWalDir()->FsyncWithDirOptions( + IOOptions(), nullptr, + DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); + } + } + + if (merged_batch == &tmp_batch_) { + tmp_batch_.Clear(); + } + if (io_s.ok()) { + auto stats = default_cf_internal_stats_; + if (need_log_sync) { + stats->AddDBStats(InternalStats::kIntStatsWalFileSynced, 1); + RecordTick(stats_, WAL_FILE_SYNCED); + } + stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal); + RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); + } + return io_s; +} + +IOStatus DBImpl::ConcurrentWriteToWAL( + const WriteThread::WriteGroup& write_group, uint64_t* log_used, + SequenceNumber* last_sequence, size_t seq_inc) { + IOStatus io_s; + + assert(two_write_queues_ || immutable_db_options_.unordered_write); + assert(!write_group.leader->disable_wal); + // Same holds for all in the batch group + WriteBatch tmp_batch; + size_t write_with_wal = 0; + WriteBatch* to_be_cached_state = nullptr; + WriteBatch* merged_batch; + io_s = status_to_io_status(MergeBatch(write_group, &tmp_batch, &merged_batch, + &write_with_wal, &to_be_cached_state)); + if (UNLIKELY(!io_s.ok())) { + return io_s; + } + + // We need to lock log_write_mutex_ since logs_ and alive_log_files might be + // pushed back concurrently + log_write_mutex_.Lock(); + if (merged_batch == write_group.leader->batch) { + write_group.leader->log_used = logfile_number_; + } else if (write_with_wal > 1) { + for (auto writer : write_group) { + writer->log_used = logfile_number_; + } + } + *last_sequence = versions_->FetchAddLastAllocatedSequence(seq_inc); + auto sequence = *last_sequence + 1; + WriteBatchInternal::SetSequence(merged_batch, sequence); + + log::Writer* log_writer = logs_.back().writer; + LogFileNumberSize& log_file_number_size = alive_log_files_.back(); + + assert(log_writer->get_log_number() == log_file_number_size.number); + + uint64_t log_size; + io_s = WriteToWAL(*merged_batch, log_writer, log_used, &log_size, + write_group.leader->rate_limiter_priority, + log_file_number_size); + if (to_be_cached_state) { + cached_recoverable_state_ = *to_be_cached_state; + cached_recoverable_state_empty_ = false; + } + log_write_mutex_.Unlock(); + + if (io_s.ok()) { + const bool concurrent = true; + auto stats = default_cf_internal_stats_; + stats->AddDBStats(InternalStats::kIntStatsWalFileBytes, log_size, + concurrent); + RecordTick(stats_, WAL_FILE_BYTES, log_size); + stats->AddDBStats(InternalStats::kIntStatsWriteWithWal, write_with_wal, + concurrent); + RecordTick(stats_, WRITE_WITH_WAL, write_with_wal); + } + return io_s; +} + +Status DBImpl::WriteRecoverableState() { + mutex_.AssertHeld(); + if (!cached_recoverable_state_empty_) { + bool dont_care_bool; + SequenceNumber next_seq; + if (two_write_queues_) { + log_write_mutex_.Lock(); + } + SequenceNumber seq; + if (two_write_queues_) { + seq = versions_->FetchAddLastAllocatedSequence(0); + } else { + seq = versions_->LastSequence(); + } + WriteBatchInternal::SetSequence(&cached_recoverable_state_, seq + 1); + auto status = WriteBatchInternal::InsertInto( + &cached_recoverable_state_, column_family_memtables_.get(), + &flush_scheduler_, &trim_history_scheduler_, true, + 0 /*recovery_log_number*/, this, false /* concurrent_memtable_writes */, + &next_seq, &dont_care_bool, seq_per_batch_); + auto last_seq = next_seq - 1; + if (two_write_queues_) { + versions_->FetchAddLastAllocatedSequence(last_seq - seq); + versions_->SetLastPublishedSequence(last_seq); + } + versions_->SetLastSequence(last_seq); + if (two_write_queues_) { + log_write_mutex_.Unlock(); + } + if (status.ok() && recoverable_state_pre_release_callback_) { + const bool DISABLE_MEMTABLE = true; + for (uint64_t sub_batch_seq = seq + 1; + sub_batch_seq < next_seq && status.ok(); sub_batch_seq++) { + uint64_t const no_log_num = 0; + // Unlock it since the callback might end up locking mutex. e.g., + // AddCommitted -> AdvanceMaxEvictedSeq -> GetSnapshotListFromDB + mutex_.Unlock(); + status = recoverable_state_pre_release_callback_->Callback( + sub_batch_seq, !DISABLE_MEMTABLE, no_log_num, 0, 1); + mutex_.Lock(); + } + } + if (status.ok()) { + cached_recoverable_state_.Clear(); + cached_recoverable_state_empty_ = true; + } + return status; + } + return Status::OK(); +} + +void DBImpl::SelectColumnFamiliesForAtomicFlush( + autovector<ColumnFamilyData*>* cfds) { + for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() || + !cached_recoverable_state_empty_.load()) { + cfds->push_back(cfd); + } + } +} + +// Assign sequence number for atomic flush. +void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) { + assert(immutable_db_options_.atomic_flush); + auto seq = versions_->LastSequence(); + for (auto cfd : cfds) { + cfd->imm()->AssignAtomicFlushSeq(seq); + } +} + +Status DBImpl::SwitchWAL(WriteContext* write_context) { + mutex_.AssertHeld(); + assert(write_context != nullptr); + Status status; + + if (alive_log_files_.begin()->getting_flushed) { + return status; + } + + auto oldest_alive_log = alive_log_files_.begin()->number; + bool flush_wont_release_oldest_log = false; + if (allow_2pc()) { + auto oldest_log_with_uncommitted_prep = + logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep(); + + assert(oldest_log_with_uncommitted_prep == 0 || + oldest_log_with_uncommitted_prep >= oldest_alive_log); + if (oldest_log_with_uncommitted_prep > 0 && + oldest_log_with_uncommitted_prep == oldest_alive_log) { + if (unable_to_release_oldest_log_) { + // we already attempted to flush all column families dependent on + // the oldest alive log but the log still contained uncommitted + // transactions so there is still nothing that we can do. + return status; + } else { + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "Unable to release oldest log due to uncommitted transaction"); + unable_to_release_oldest_log_ = true; + flush_wont_release_oldest_log = true; + } + } + } + if (!flush_wont_release_oldest_log) { + // we only mark this log as getting flushed if we have successfully + // flushed all data in this log. If this log contains outstanding prepared + // transactions then we cannot flush this log until those transactions are + // commited. + unable_to_release_oldest_log_ = false; + alive_log_files_.begin()->getting_flushed = true; + } + + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Flushing all column families with data in WAL number %" PRIu64 + ". Total log size is %" PRIu64 " while max_total_wal_size is %" PRIu64, + oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize()); + // no need to refcount because drop is happening in write thread, so can't + // happen while we're in the write thread + autovector<ColumnFamilyData*> cfds; + if (immutable_db_options_.atomic_flush) { + SelectColumnFamiliesForAtomicFlush(&cfds); + } else { + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if (cfd->OldestLogToKeep() <= oldest_alive_log) { + cfds.push_back(cfd); + } + } + MaybeFlushStatsCF(&cfds); + } + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + for (const auto cfd : cfds) { + cfd->Ref(); + status = SwitchMemtable(cfd, write_context); + cfd->UnrefAndTryDelete(); + if (!status.ok()) { + break; + } + } + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + + if (status.ok()) { + if (immutable_db_options_.atomic_flush) { + AssignAtomicFlushSeq(cfds); + } + for (auto cfd : cfds) { + cfd->imm()->FlushRequested(); + if (!immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWalFull); + } + } + if (immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWalFull); + } + MaybeScheduleFlushOrCompaction(); + } + return status; +} + +Status DBImpl::HandleWriteBufferManagerFlush(WriteContext* write_context) { + mutex_.AssertHeld(); + assert(write_context != nullptr); + Status status; + + // Before a new memtable is added in SwitchMemtable(), + // write_buffer_manager_->ShouldFlush() will keep returning true. If another + // thread is writing to another DB with the same write buffer, they may also + // be flushed. We may end up with flushing much more DBs than needed. It's + // suboptimal but still correct. + // no need to refcount because drop is happening in write thread, so can't + // happen while we're in the write thread + autovector<ColumnFamilyData*> cfds; + if (immutable_db_options_.atomic_flush) { + SelectColumnFamiliesForAtomicFlush(&cfds); + } else { + ColumnFamilyData* cfd_picked = nullptr; + SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber; + + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + if (!cfd->mem()->IsEmpty() && !cfd->imm()->IsFlushPendingOrRunning()) { + // We only consider flush on CFs with bytes in the mutable memtable, + // and no immutable memtables for which flush has yet to finish. If + // we triggered flush on CFs already trying to flush, we would risk + // creating too many immutable memtables leading to write stalls. + uint64_t seq = cfd->mem()->GetCreationSeq(); + if (cfd_picked == nullptr || seq < seq_num_for_cf_picked) { + cfd_picked = cfd; + seq_num_for_cf_picked = seq; + } + } + } + if (cfd_picked != nullptr) { + cfds.push_back(cfd_picked); + } + MaybeFlushStatsCF(&cfds); + } + if (!cfds.empty()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Flushing triggered to alleviate write buffer memory usage. Write " + "buffer is using %" ROCKSDB_PRIszt + " bytes out of a total of %" ROCKSDB_PRIszt ".", + write_buffer_manager_->memory_usage(), + write_buffer_manager_->buffer_size()); + } + + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + for (const auto cfd : cfds) { + if (cfd->mem()->IsEmpty()) { + continue; + } + cfd->Ref(); + status = SwitchMemtable(cfd, write_context); + cfd->UnrefAndTryDelete(); + if (!status.ok()) { + break; + } + } + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + + if (status.ok()) { + if (immutable_db_options_.atomic_flush) { + AssignAtomicFlushSeq(cfds); + } + for (const auto cfd : cfds) { + cfd->imm()->FlushRequested(); + if (!immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + } + } + if (immutable_db_options_.atomic_flush) { + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager); + } + MaybeScheduleFlushOrCompaction(); + } + return status; +} + +uint64_t DBImpl::GetMaxTotalWalSize() const { + uint64_t max_total_wal_size = + max_total_wal_size_.load(std::memory_order_acquire); + if (max_total_wal_size > 0) { + return max_total_wal_size; + } + return 4 * max_total_in_memory_state_.load(std::memory_order_acquire); +} + +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +Status DBImpl::DelayWrite(uint64_t num_bytes, + const WriteOptions& write_options) { + uint64_t time_delayed = 0; + bool delayed = false; + { + StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL, + &time_delayed); + uint64_t delay = + write_controller_.GetDelay(immutable_db_options_.clock, num_bytes); + TEST_SYNC_POINT("DBImpl::DelayWrite:Start"); + if (delay > 0) { + if (write_options.no_slowdown) { + return Status::Incomplete("Write stall"); + } + TEST_SYNC_POINT("DBImpl::DelayWrite:Sleep"); + + // Notify write_thread_ about the stall so it can setup a barrier and + // fail any pending writers with no_slowdown + write_thread_.BeginWriteStall(); + mutex_.Unlock(); + TEST_SYNC_POINT("DBImpl::DelayWrite:BeginWriteStallDone"); + // We will delay the write until we have slept for `delay` microseconds + // or we don't need a delay anymore. We check for cancellation every 1ms + // (slightly longer because WriteController minimum delay is 1ms, in + // case of sleep imprecision, rounding, etc.) + const uint64_t kDelayInterval = 1001; + uint64_t stall_end = sw.start_time() + delay; + while (write_controller_.NeedsDelay()) { + if (immutable_db_options_.clock->NowMicros() >= stall_end) { + // We already delayed this write `delay` microseconds + break; + } + + delayed = true; + // Sleep for 0.001 seconds + immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval); + } + mutex_.Lock(); + write_thread_.EndWriteStall(); + } + + // Don't wait if there's a background error, even if its a soft error. We + // might wait here indefinitely as the background compaction may never + // finish successfully, resulting in the stall condition lasting + // indefinitely + while (error_handler_.GetBGError().ok() && write_controller_.IsStopped() && + !shutting_down_.load(std::memory_order_relaxed)) { + if (write_options.no_slowdown) { + return Status::Incomplete("Write stall"); + } + delayed = true; + + // Notify write_thread_ about the stall so it can setup a barrier and + // fail any pending writers with no_slowdown + write_thread_.BeginWriteStall(); + TEST_SYNC_POINT("DBImpl::DelayWrite:Wait"); + bg_cv_.Wait(); + write_thread_.EndWriteStall(); + } + } + assert(!delayed || !write_options.no_slowdown); + if (delayed) { + default_cf_internal_stats_->AddDBStats( + InternalStats::kIntStatsWriteStallMicros, time_delayed); + RecordTick(stats_, STALL_MICROS, time_delayed); + } + + // If DB is not in read-only mode and write_controller is not stopping + // writes, we can ignore any background errors and allow the write to + // proceed + Status s; + if (write_controller_.IsStopped()) { + if (!shutting_down_.load(std::memory_order_relaxed)) { + // If writes are still stopped and db not shutdown, it means we bailed + // due to a background error + s = Status::Incomplete(error_handler_.GetBGError().ToString()); + } else { + s = Status::ShutdownInProgress("stalled writes"); + } + } + if (error_handler_.IsDBStopped()) { + s = error_handler_.GetBGError(); + } + return s; +} + +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +void DBImpl::WriteBufferManagerStallWrites() { + mutex_.AssertHeld(); + // First block future writer threads who want to add themselves to the queue + // of WriteThread. + write_thread_.BeginWriteStall(); + mutex_.Unlock(); + + // Change the state to State::Blocked. + static_cast<WBMStallInterface*>(wbm_stall_.get()) + ->SetState(WBMStallInterface::State::BLOCKED); + // Then WriteBufferManager will add DB instance to its queue + // and block this thread by calling WBMStallInterface::Block(). + write_buffer_manager_->BeginWriteStall(wbm_stall_.get()); + wbm_stall_->Block(); + + mutex_.Lock(); + // Stall has ended. Signal writer threads so that they can add + // themselves to the WriteThread queue for writes. + write_thread_.EndWriteStall(); +} + +Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options, + WriteBatch* my_batch) { + assert(write_options.low_pri); + // This is called outside the DB mutex. Although it is safe to make the call, + // the consistency condition is not guaranteed to hold. It's OK to live with + // it in this case. + // If we need to speed compaction, it means the compaction is left behind + // and we start to limit low pri writes to a limit. + if (write_controller_.NeedSpeedupCompaction()) { + if (allow_2pc() && (my_batch->HasCommit() || my_batch->HasRollback())) { + // For 2PC, we only rate limit prepare, not commit. + return Status::OK(); + } + if (write_options.no_slowdown) { + return Status::Incomplete("Low priority write stall"); + } else { + assert(my_batch != nullptr); + // Rate limit those writes. The reason that we don't completely wait + // is that in case the write is heavy, low pri writes may never have + // a chance to run. Now we guarantee we are still slowly making + // progress. + PERF_TIMER_GUARD(write_delay_time); + write_controller_.low_pri_rate_limiter()->Request( + my_batch->GetDataSize(), Env::IO_HIGH, nullptr /* stats */, + RateLimiter::OpType::kWrite); + } + } + return Status::OK(); +} + +void DBImpl::MaybeFlushStatsCF(autovector<ColumnFamilyData*>* cfds) { + assert(cfds != nullptr); + if (!cfds->empty() && immutable_db_options_.persist_stats_to_disk) { + ColumnFamilyData* cfd_stats = + versions_->GetColumnFamilySet()->GetColumnFamily( + kPersistentStatsColumnFamilyName); + if (cfd_stats != nullptr && !cfd_stats->mem()->IsEmpty()) { + for (ColumnFamilyData* cfd : *cfds) { + if (cfd == cfd_stats) { + // stats CF already included in cfds + return; + } + } + // force flush stats CF when its log number is less than all other CF's + // log numbers + bool force_flush_stats_cf = true; + for (auto* loop_cfd : *versions_->GetColumnFamilySet()) { + if (loop_cfd == cfd_stats) { + continue; + } + if (loop_cfd->GetLogNumber() <= cfd_stats->GetLogNumber()) { + force_flush_stats_cf = false; + } + } + if (force_flush_stats_cf) { + cfds->push_back(cfd_stats); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Force flushing stats CF with automated flush " + "to avoid holding old logs"); + } + } + } +} + +Status DBImpl::TrimMemtableHistory(WriteContext* context) { + autovector<ColumnFamilyData*> cfds; + ColumnFamilyData* tmp_cfd; + while ((tmp_cfd = trim_history_scheduler_.TakeNextColumnFamily()) != + nullptr) { + cfds.push_back(tmp_cfd); + } + for (auto& cfd : cfds) { + autovector<MemTable*> to_delete; + bool trimmed = cfd->imm()->TrimHistory(&context->memtables_to_free_, + cfd->mem()->MemoryAllocatedBytes()); + if (trimmed) { + context->superversion_context.NewSuperVersion(); + assert(context->superversion_context.new_superversion.get() != nullptr); + cfd->InstallSuperVersion(&context->superversion_context, &mutex_); + } + + if (cfd->UnrefAndTryDelete()) { + cfd = nullptr; + } + } + return Status::OK(); +} + +Status DBImpl::ScheduleFlushes(WriteContext* context) { + autovector<ColumnFamilyData*> cfds; + if (immutable_db_options_.atomic_flush) { + SelectColumnFamiliesForAtomicFlush(&cfds); + for (auto cfd : cfds) { + cfd->Ref(); + } + flush_scheduler_.Clear(); + } else { + ColumnFamilyData* tmp_cfd; + while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) { + cfds.push_back(tmp_cfd); + } + MaybeFlushStatsCF(&cfds); + } + Status status; + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + + for (auto& cfd : cfds) { + if (!cfd->mem()->IsEmpty()) { + status = SwitchMemtable(cfd, context); + } + if (cfd->UnrefAndTryDelete()) { + cfd = nullptr; + } + if (!status.ok()) { + break; + } + } + + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + + if (status.ok()) { + if (immutable_db_options_.atomic_flush) { + AssignAtomicFlushSeq(cfds); + FlushRequest flush_req; + GenerateFlushRequest(cfds, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } else { + for (auto* cfd : cfds) { + FlushRequest flush_req; + GenerateFlushRequest({cfd}, &flush_req); + SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull); + } + } + MaybeScheduleFlushOrCompaction(); + } + return status; +} + +#ifndef ROCKSDB_LITE +void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, + const MemTableInfo& mem_table_info) { + if (immutable_db_options_.listeners.size() == 0U) { + return; + } + if (shutting_down_.load(std::memory_order_acquire)) { + return; + } + + mutex_.Unlock(); + for (auto listener : immutable_db_options_.listeners) { + listener->OnMemTableSealed(mem_table_info); + } + mutex_.Lock(); +} +#endif // ROCKSDB_LITE + +// REQUIRES: mutex_ is held +// REQUIRES: this thread is currently at the front of the writer queue +// REQUIRES: this thread is currently at the front of the 2nd writer queue if +// two_write_queues_ is true (This is to simplify the reasoning.) +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { + mutex_.AssertHeld(); + log::Writer* new_log = nullptr; + MemTable* new_mem = nullptr; + IOStatus io_s; + + // Recoverable state is persisted in WAL. After memtable switch, WAL might + // be deleted, so we write the state to memtable to be persisted as well. + Status s = WriteRecoverableState(); + if (!s.ok()) { + return s; + } + + // Attempt to switch to a new memtable and trigger flush of old. + // Do this without holding the dbmutex lock. + assert(versions_->prev_log_number() == 0); + if (two_write_queues_) { + log_write_mutex_.Lock(); + } + bool creating_new_log = !log_empty_; + if (two_write_queues_) { + log_write_mutex_.Unlock(); + } + uint64_t recycle_log_number = 0; + if (creating_new_log && immutable_db_options_.recycle_log_file_num && + !log_recycle_files_.empty()) { + recycle_log_number = log_recycle_files_.front(); + } + uint64_t new_log_number = + creating_new_log ? versions_->NewFileNumber() : logfile_number_; + const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); + + // Set memtable_info for memtable sealed callback +#ifndef ROCKSDB_LITE + MemTableInfo memtable_info; + memtable_info.cf_name = cfd->GetName(); + memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber(); + memtable_info.earliest_seqno = cfd->mem()->GetEarliestSequenceNumber(); + memtable_info.num_entries = cfd->mem()->num_entries(); + memtable_info.num_deletes = cfd->mem()->num_deletes(); +#endif // ROCKSDB_LITE + // Log this later after lock release. It may be outdated, e.g., if background + // flush happens before logging, but that should be ok. + int num_imm_unflushed = cfd->imm()->NumNotFlushed(); + const auto preallocate_block_size = + GetWalPreallocateBlockSize(mutable_cf_options.write_buffer_size); + mutex_.Unlock(); + if (creating_new_log) { + // TODO: Write buffer size passed in should be max of all CF's instead + // of mutable_cf_options.write_buffer_size. + io_s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size, + &new_log); + if (s.ok()) { + s = io_s; + } + } + if (s.ok()) { + SequenceNumber seq = versions_->LastSequence(); + new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + context->superversion_context.NewSuperVersion(); + } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "[%s] New memtable created with log file: #%" PRIu64 + ". Immutable memtables: %d.\n", + cfd->GetName().c_str(), new_log_number, num_imm_unflushed); + // There should be no concurrent write as the thread is at the front of + // writer queue + cfd->mem()->ConstructFragmentedRangeTombstones(); + + mutex_.Lock(); + if (recycle_log_number != 0) { + // Since renaming the file is done outside DB mutex, we need to ensure + // concurrent full purges don't delete the file while we're recycling it. + // To achieve that we hold the old log number in the recyclable list until + // after it has been renamed. + assert(log_recycle_files_.front() == recycle_log_number); + log_recycle_files_.pop_front(); + } + if (s.ok() && creating_new_log) { + InstrumentedMutexLock l(&log_write_mutex_); + assert(new_log != nullptr); + if (!logs_.empty()) { + // Alway flush the buffer of the last log before switching to a new one + log::Writer* cur_log_writer = logs_.back().writer; + if (error_handler_.IsRecoveryInProgress()) { + // In recovery path, we force another try of writing WAL buffer. + cur_log_writer->file()->reset_seen_error(); + } + io_s = cur_log_writer->WriteBuffer(); + if (s.ok()) { + s = io_s; + } + if (!s.ok()) { + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "[%s] Failed to switch from #%" PRIu64 " to #%" PRIu64 + " WAL file\n", + cfd->GetName().c_str(), cur_log_writer->get_log_number(), + new_log_number); + } + } + if (s.ok()) { + logfile_number_ = new_log_number; + log_empty_ = true; + log_dir_synced_ = false; + logs_.emplace_back(logfile_number_, new_log); + alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); + } + } + + if (!s.ok()) { + // how do we fail if we're not creating new log? + assert(creating_new_log); + delete new_mem; + delete new_log; + context->superversion_context.new_superversion.reset(); + // We may have lost data from the WritableFileBuffer in-memory buffer for + // the current log, so treat it as a fatal error and set bg_error + if (!io_s.ok()) { + error_handler_.SetBGError(io_s, BackgroundErrorReason::kMemTable); + } else { + error_handler_.SetBGError(s, BackgroundErrorReason::kMemTable); + } + // Read back bg_error in order to get the right severity + s = error_handler_.GetBGError(); + return s; + } + + bool empty_cf_updated = false; + if (immutable_db_options_.track_and_verify_wals_in_manifest && + !immutable_db_options_.allow_2pc && creating_new_log) { + // In non-2pc mode, WALs become obsolete if they do not contain unflushed + // data. Updating the empty CF's log number might cause some WALs to become + // obsolete. So we should track the WAL obsoletion event before actually + // updating the empty CF's log number. + uint64_t min_wal_number_to_keep = + versions_->PreComputeMinLogNumberWithUnflushedData(logfile_number_); + if (min_wal_number_to_keep > + versions_->GetWalSet().GetMinWalNumberToKeep()) { + // Get a snapshot of the empty column families. + // LogAndApply may release and reacquire db + // mutex, during that period, column family may become empty (e.g. its + // flush succeeds), then it affects the computed min_log_number_to_keep, + // so we take a snapshot for consistency of column family data + // status. If a column family becomes non-empty afterwards, its active log + // should still be the created new log, so the min_log_number_to_keep is + // not affected. + autovector<ColumnFamilyData*> empty_cfs; + for (auto cf : *versions_->GetColumnFamilySet()) { + if (cf->IsEmpty()) { + empty_cfs.push_back(cf); + } + } + + VersionEdit wal_deletion; + wal_deletion.DeleteWalsBefore(min_wal_number_to_keep); + s = versions_->LogAndApplyToDefaultColumnFamily(&wal_deletion, &mutex_, + directories_.GetDbDir()); + if (!s.ok() && versions_->io_status().IsIOError()) { + s = error_handler_.SetBGError(versions_->io_status(), + BackgroundErrorReason::kManifestWrite); + } + if (!s.ok()) { + return s; + } + + for (auto cf : empty_cfs) { + if (cf->IsEmpty()) { + cf->SetLogNumber(logfile_number_); + // MEMPURGE: No need to change this, because new adds + // should still receive new sequence numbers. + cf->mem()->SetCreationSeq(versions_->LastSequence()); + } // cf may become non-empty. + } + empty_cf_updated = true; + } + } + if (!empty_cf_updated) { + for (auto cf : *versions_->GetColumnFamilySet()) { + // all this is just optimization to delete logs that + // are no longer needed -- if CF is empty, that means it + // doesn't need that particular log to stay alive, so we just + // advance the log number. no need to persist this in the manifest + if (cf->IsEmpty()) { + if (creating_new_log) { + cf->SetLogNumber(logfile_number_); + } + cf->mem()->SetCreationSeq(versions_->LastSequence()); + } + } + } + + cfd->mem()->SetNextLogNumber(logfile_number_); + assert(new_mem != nullptr); + cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, + mutable_cf_options); + +#ifndef ROCKSDB_LITE + // Notify client that memtable is sealed, now that we have successfully + // installed a new memtable + NotifyOnMemTableSealed(cfd, memtable_info); +#endif // ROCKSDB_LITE + // It is possible that we got here without checking the value of i_os, but + // that is okay. If we did, it most likely means that s was already an error. + // In any case, ignore any unchecked error for i_os here. + io_s.PermitUncheckedError(); + return s; +} + +size_t DBImpl::GetWalPreallocateBlockSize(uint64_t write_buffer_size) const { + mutex_.AssertHeld(); + size_t bsize = + static_cast<size_t>(write_buffer_size / 10 + write_buffer_size); + // Some users might set very high write_buffer_size and rely on + // max_total_wal_size or other parameters to control the WAL size. + if (mutable_db_options_.max_total_wal_size > 0) { + bsize = std::min<size_t>( + bsize, static_cast<size_t>(mutable_db_options_.max_total_wal_size)); + } + if (immutable_db_options_.db_write_buffer_size > 0) { + bsize = std::min<size_t>(bsize, immutable_db_options_.db_write_buffer_size); + } + if (immutable_db_options_.write_buffer_manager && + immutable_db_options_.write_buffer_manager->enabled()) { + bsize = std::min<size_t>( + bsize, immutable_db_options_.write_buffer_manager->buffer_size()); + } + + return bsize; +} + +// Default implementations of convenience methods that subclasses of DB +// can call if they wish +Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + // Pre-allocate size of write batch conservatively. + // 8 bytes are taken by header, 4 bytes for count, 1 byte for type, + // and we allocate 11 extra bytes for key length, as well as value length. + WriteBatch batch(key.size() + value.size() + 24, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); + Status s = batch.Put(column_family, key, value); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.Put(column_family, key, ts, value); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::PutEntity(const WriteOptions& options, + ColumnFamilyHandle* column_family, const Slice& key, + const WideColumns& columns) { + const ColumnFamilyHandle* const default_cf = DefaultColumnFamily(); + assert(default_cf); + + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + + WriteBatch batch(/* reserved_bytes */ 0, /* max_bytes */ 0, + options.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + + const Status s = batch.PutEntity(column_family, key, columns); + if (!s.ok()) { + return s; + } + + return Write(options, &batch); +} + +Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key) { + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); + Status s = batch.Delete(column_family, key); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.Delete(column_family, key, ts); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::SingleDelete(const WriteOptions& opt, + ColumnFamilyHandle* column_family, const Slice& key) { + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); + Status s = batch.SingleDelete(column_family, key); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::SingleDelete(const WriteOptions& opt, + ColumnFamilyHandle* column_family, const Slice& key, + const Slice& ts) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.SingleDelete(column_family, key, ts); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::DeleteRange(const WriteOptions& opt, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key) { + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); + Status s = batch.DeleteRange(column_family, begin_key, end_key); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::DeleteRange(const WriteOptions& opt, + ColumnFamilyHandle* column_family, + const Slice& begin_key, const Slice& end_key, + const Slice& ts) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.DeleteRange(column_family, begin_key, end_key, ts); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& value) { + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, 0 /* default_cf_ts_sz */); + Status s = batch.Merge(column_family, key, value); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family, + const Slice& key, const Slice& ts, const Slice& value) { + ColumnFamilyHandle* default_cf = DefaultColumnFamily(); + assert(default_cf); + const Comparator* const default_cf_ucmp = default_cf->GetComparator(); + assert(default_cf_ucmp); + WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, + opt.protection_bytes_per_key, + default_cf_ucmp->timestamp_size()); + Status s = batch.Merge(column_family, key, ts, value); + if (!s.ok()) { + return s; + } + return Write(opt, &batch); +} + +} // namespace ROCKSDB_NAMESPACE |