diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/db_impl/db_impl_secondary.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/db_impl/db_impl_secondary.cc')
-rw-r--r-- | src/rocksdb/db/db_impl/db_impl_secondary.cc | 967 |
1 files changed, 967 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_impl/db_impl_secondary.cc b/src/rocksdb/db/db_impl/db_impl_secondary.cc new file mode 100644 index 000000000..5189d17d9 --- /dev/null +++ b/src/rocksdb/db/db_impl/db_impl_secondary.cc @@ -0,0 +1,967 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/db_impl/db_impl_secondary.h" + +#include <cinttypes> + +#include "db/arena_wrapped_db_iter.h" +#include "db/merge_context.h" +#include "logging/auto_roll_logger.h" +#include "logging/logging.h" +#include "monitoring/perf_context_imp.h" +#include "rocksdb/configurable.h" +#include "util/cast_util.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +DBImplSecondary::DBImplSecondary(const DBOptions& db_options, + const std::string& dbname, + std::string secondary_path) + : DBImpl(db_options, dbname, false, true, true), + secondary_path_(std::move(secondary_path)) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Opening the db in secondary mode"); + LogFlush(immutable_db_options_.info_log); +} + +DBImplSecondary::~DBImplSecondary() {} + +Status DBImplSecondary::Recover( + const std::vector<ColumnFamilyDescriptor>& column_families, + bool /*readonly*/, bool /*error_if_wal_file_exists*/, + bool /*error_if_data_exists_in_wals*/, uint64_t*, + RecoveryContext* /*recovery_ctx*/) { + mutex_.AssertHeld(); + + JobContext job_context(0); + Status s; + s = static_cast<ReactiveVersionSet*>(versions_.get()) + ->Recover(column_families, &manifest_reader_, &manifest_reporter_, + &manifest_reader_status_); + if (!s.ok()) { + if (manifest_reader_status_) { + manifest_reader_status_->PermitUncheckedError(); + } + return s; + } + if (immutable_db_options_.paranoid_checks && s.ok()) { + s = CheckConsistency(); + } + // Initial max_total_in_memory_state_ before recovery logs. + max_total_in_memory_state_ = 0; + for (auto cfd : *versions_->GetColumnFamilySet()) { + auto* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + max_total_in_memory_state_ += mutable_cf_options->write_buffer_size * + mutable_cf_options->max_write_buffer_number; + } + if (s.ok()) { + default_cf_handle_ = new ColumnFamilyHandleImpl( + versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_); + default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats(); + + std::unordered_set<ColumnFamilyData*> cfds_changed; + s = FindAndRecoverLogFiles(&cfds_changed, &job_context); + } + + if (s.IsPathNotFound()) { + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Secondary tries to read WAL, but WAL file(s) have already " + "been purged by primary."); + s = Status::OK(); + } + // TODO: update options_file_number_ needed? + + job_context.Clean(); + return s; +} + +// find new WAL and apply them in order to the secondary instance +Status DBImplSecondary::FindAndRecoverLogFiles( + std::unordered_set<ColumnFamilyData*>* cfds_changed, + JobContext* job_context) { + assert(nullptr != cfds_changed); + assert(nullptr != job_context); + Status s; + std::vector<uint64_t> logs; + s = FindNewLogNumbers(&logs); + if (s.ok() && !logs.empty()) { + SequenceNumber next_sequence(kMaxSequenceNumber); + s = RecoverLogFiles(logs, &next_sequence, cfds_changed, job_context); + } + return s; +} + +// List wal_dir and find all new WALs, return these log numbers +Status DBImplSecondary::FindNewLogNumbers(std::vector<uint64_t>* logs) { + assert(logs != nullptr); + std::vector<std::string> filenames; + Status s; + IOOptions io_opts; + io_opts.do_not_recurse = true; + s = immutable_db_options_.fs->GetChildren(immutable_db_options_.GetWalDir(), + io_opts, &filenames, + /*IODebugContext*=*/nullptr); + if (s.IsNotFound()) { + return Status::InvalidArgument("Failed to open wal_dir", + immutable_db_options_.GetWalDir()); + } else if (!s.ok()) { + return s; + } + + // if log_readers_ is non-empty, it means we have applied all logs with log + // numbers smaller than the smallest log in log_readers_, so there is no + // need to pass these logs to RecoverLogFiles + uint64_t log_number_min = 0; + if (!log_readers_.empty()) { + log_number_min = log_readers_.begin()->first; + } + for (size_t i = 0; i < filenames.size(); i++) { + uint64_t number; + FileType type; + if (ParseFileName(filenames[i], &number, &type) && type == kWalFile && + number >= log_number_min) { + logs->push_back(number); + } + } + // Recover logs in the order that they were generated + if (!logs->empty()) { + std::sort(logs->begin(), logs->end()); + } + return s; +} + +Status DBImplSecondary::MaybeInitLogReader( + uint64_t log_number, log::FragmentBufferedReader** log_reader) { + auto iter = log_readers_.find(log_number); + // make sure the log file is still present + if (iter == log_readers_.end() || + iter->second->reader_->GetLogNumber() != log_number) { + // delete the obsolete log reader if log number mismatch + if (iter != log_readers_.end()) { + log_readers_.erase(iter); + } + // initialize log reader from log_number + // TODO: min_log_number_to_keep_2pc check needed? + // Open the log file + std::string fname = + LogFileName(immutable_db_options_.GetWalDir(), log_number); + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Recovering log #%" PRIu64 " mode %d", log_number, + static_cast<int>(immutable_db_options_.wal_recovery_mode)); + + std::unique_ptr<SequentialFileReader> file_reader; + { + std::unique_ptr<FSSequentialFile> file; + Status status = fs_->NewSequentialFile( + fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr); + if (!status.ok()) { + *log_reader = nullptr; + return status; + } + file_reader.reset(new SequentialFileReader( + std::move(file), fname, immutable_db_options_.log_readahead_size, + io_tracer_)); + } + + // Create the log reader. + LogReaderContainer* log_reader_container = new LogReaderContainer( + env_, immutable_db_options_.info_log, std::move(fname), + std::move(file_reader), log_number); + log_readers_.insert(std::make_pair( + log_number, std::unique_ptr<LogReaderContainer>(log_reader_container))); + } + iter = log_readers_.find(log_number); + assert(iter != log_readers_.end()); + *log_reader = iter->second->reader_; + return Status::OK(); +} + +// After manifest recovery, replay WALs and refresh log_readers_ if necessary +// REQUIRES: log_numbers are sorted in ascending order +Status DBImplSecondary::RecoverLogFiles( + const std::vector<uint64_t>& log_numbers, SequenceNumber* next_sequence, + std::unordered_set<ColumnFamilyData*>* cfds_changed, + JobContext* job_context) { + assert(nullptr != cfds_changed); + assert(nullptr != job_context); + mutex_.AssertHeld(); + Status status; + for (auto log_number : log_numbers) { + log::FragmentBufferedReader* reader = nullptr; + status = MaybeInitLogReader(log_number, &reader); + if (!status.ok()) { + return status; + } + assert(reader != nullptr); + } + for (auto log_number : log_numbers) { + auto it = log_readers_.find(log_number); + assert(it != log_readers_.end()); + log::FragmentBufferedReader* reader = it->second->reader_; + Status* wal_read_status = it->second->status_; + assert(wal_read_status); + // Manually update the file number allocation counter in VersionSet. + versions_->MarkFileNumberUsed(log_number); + + // Determine if we should tolerate incomplete records at the tail end of the + // Read all the records and add to a memtable + std::string scratch; + Slice record; + WriteBatch batch; + + while (reader->ReadRecord(&record, &scratch, + immutable_db_options_.wal_recovery_mode) && + wal_read_status->ok() && status.ok()) { + if (record.size() < WriteBatchInternal::kHeader) { + reader->GetReporter()->Corruption( + record.size(), Status::Corruption("log record too small")); + continue; + } + status = WriteBatchInternal::SetContents(&batch, record); + if (!status.ok()) { + break; + } + SequenceNumber seq_of_batch = WriteBatchInternal::Sequence(&batch); + std::vector<uint32_t> column_family_ids; + status = CollectColumnFamilyIdsFromWriteBatch(batch, &column_family_ids); + if (status.ok()) { + for (const auto id : column_family_ids) { + ColumnFamilyData* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(id); + if (cfd == nullptr) { + continue; + } + if (cfds_changed->count(cfd) == 0) { + cfds_changed->insert(cfd); + } + const std::vector<FileMetaData*>& l0_files = + cfd->current()->storage_info()->LevelFiles(0); + SequenceNumber seq = + l0_files.empty() ? 0 : l0_files.back()->fd.largest_seqno; + // If the write batch's sequence number is smaller than the last + // sequence number of the largest sequence persisted for this column + // family, then its data must reside in an SST that has already been + // added in the prior MANIFEST replay. + if (seq_of_batch <= seq) { + continue; + } + auto curr_log_num = std::numeric_limits<uint64_t>::max(); + if (cfd_to_current_log_.count(cfd) > 0) { + curr_log_num = cfd_to_current_log_[cfd]; + } + // If the active memtable contains records added by replaying an + // earlier WAL, then we need to seal the memtable, add it to the + // immutable memtable list and create a new active memtable. + if (!cfd->mem()->IsEmpty() && + (curr_log_num == std::numeric_limits<uint64_t>::max() || + curr_log_num != log_number)) { + const MutableCFOptions mutable_cf_options = + *cfd->GetLatestMutableCFOptions(); + MemTable* new_mem = + cfd->ConstructNewMemtable(mutable_cf_options, seq_of_batch); + cfd->mem()->SetNextLogNumber(log_number); + cfd->mem()->ConstructFragmentedRangeTombstones(); + cfd->imm()->Add(cfd->mem(), &job_context->memtables_to_free); + new_mem->Ref(); + cfd->SetMemtable(new_mem); + } + } + bool has_valid_writes = false; + status = WriteBatchInternal::InsertInto( + &batch, column_family_memtables_.get(), + nullptr /* flush_scheduler */, nullptr /* trim_history_scheduler*/, + true, log_number, this, false /* concurrent_memtable_writes */, + next_sequence, &has_valid_writes, seq_per_batch_, batch_per_txn_); + } + // If column family was not found, it might mean that the WAL write + // batch references to the column family that was dropped after the + // insert. We don't want to fail the whole write batch in that case -- + // we just ignore the update. + // That's why we set ignore missing column families to true + // passing null flush_scheduler will disable memtable flushing which is + // needed for secondary instances + if (status.ok()) { + for (const auto id : column_family_ids) { + ColumnFamilyData* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(id); + if (cfd == nullptr) { + continue; + } + std::unordered_map<ColumnFamilyData*, uint64_t>::iterator iter = + cfd_to_current_log_.find(cfd); + if (iter == cfd_to_current_log_.end()) { + cfd_to_current_log_.insert({cfd, log_number}); + } else if (log_number > iter->second) { + iter->second = log_number; + } + } + auto last_sequence = *next_sequence - 1; + if ((*next_sequence != kMaxSequenceNumber) && + (versions_->LastSequence() <= last_sequence)) { + versions_->SetLastAllocatedSequence(last_sequence); + versions_->SetLastPublishedSequence(last_sequence); + versions_->SetLastSequence(last_sequence); + } + } else { + // We are treating this as a failure while reading since we read valid + // blocks that do not form coherent data + reader->GetReporter()->Corruption(record.size(), status); + } + } + if (status.ok() && !wal_read_status->ok()) { + status = *wal_read_status; + } + if (!status.ok()) { + return status; + } + } + // remove logreaders from map after successfully recovering the WAL + if (log_readers_.size() > 1) { + auto erase_iter = log_readers_.begin(); + std::advance(erase_iter, log_readers_.size() - 1); + log_readers_.erase(log_readers_.begin(), erase_iter); + } + return status; +} + +// Implementation of the DB interface +Status DBImplSecondary::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value) { + return GetImpl(read_options, column_family, key, value, + /*timestamp*/ nullptr); +} + +Status DBImplSecondary::Get(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, const Slice& key, + PinnableSlice* value, std::string* timestamp) { + return GetImpl(read_options, column_family, key, value, timestamp); +} + +Status DBImplSecondary::GetImpl(const ReadOptions& read_options, + ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* pinnable_val, + std::string* timestamp) { + assert(pinnable_val != nullptr); + PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock); + StopWatch sw(immutable_db_options_.clock, stats_, DB_GET); + PERF_TIMER_GUARD(get_snapshot_time); + + assert(column_family); + if (read_options.timestamp) { + const Status s = FailIfTsMismatchCf( + column_family, *(read_options.timestamp), /*ts_for_read=*/true); + if (!s.ok()) { + return s; + } + } else { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return s; + } + } + + // Clear the timestamp for returning results so that we can distinguish + // between tombstone or key that has never been written later. + if (timestamp) { + timestamp->clear(); + } + + auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family); + ColumnFamilyData* cfd = cfh->cfd(); + if (tracer_) { + InstrumentedMutexLock lock(&trace_mutex_); + if (tracer_) { + tracer_->Get(column_family, key); + } + } + // Acquire SuperVersion + SuperVersion* super_version = GetAndRefSuperVersion(cfd); + SequenceNumber snapshot = versions_->LastSequence(); + GetWithTimestampReadCallback read_cb(snapshot); + MergeContext merge_context; + SequenceNumber max_covering_tombstone_seq = 0; + Status s; + LookupKey lkey(key, snapshot, read_options.timestamp); + PERF_TIMER_STOP(get_snapshot_time); + + bool done = false; + const Comparator* ucmp = column_family->GetComparator(); + assert(ucmp); + std::string* ts = ucmp->timestamp_size() > 0 ? timestamp : nullptr; + if (super_version->mem->Get(lkey, pinnable_val->GetSelf(), + /*columns=*/nullptr, ts, &s, &merge_context, + &max_covering_tombstone_seq, read_options, + false /* immutable_memtable */, &read_cb)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } else if ((s.ok() || s.IsMergeInProgress()) && + super_version->imm->Get( + lkey, pinnable_val->GetSelf(), /*columns=*/nullptr, ts, &s, + &merge_context, &max_covering_tombstone_seq, read_options, + &read_cb)) { + done = true; + pinnable_val->PinSelf(); + RecordTick(stats_, MEMTABLE_HIT); + } + if (!done && !s.ok() && !s.IsMergeInProgress()) { + ReturnAndCleanupSuperVersion(cfd, super_version); + return s; + } + if (!done) { + PERF_TIMER_GUARD(get_from_output_files_time); + PinnedIteratorsManager pinned_iters_mgr; + super_version->current->Get( + read_options, lkey, pinnable_val, /*columns=*/nullptr, ts, &s, + &merge_context, &max_covering_tombstone_seq, &pinned_iters_mgr, + /*value_found*/ nullptr, + /*key_exists*/ nullptr, /*seq*/ nullptr, &read_cb, /*is_blob*/ nullptr, + /*do_merge*/ true); + RecordTick(stats_, MEMTABLE_MISS); + } + { + PERF_TIMER_GUARD(get_post_process_time); + ReturnAndCleanupSuperVersion(cfd, super_version); + RecordTick(stats_, NUMBER_KEYS_READ); + size_t size = pinnable_val->size(); + RecordTick(stats_, BYTES_READ, size); + RecordTimeToHistogram(stats_, BYTES_PER_READ, size); + PERF_COUNTER_ADD(get_read_bytes, size); + } + return s; +} + +Iterator* DBImplSecondary::NewIterator(const ReadOptions& read_options, + ColumnFamilyHandle* column_family) { + if (read_options.managed) { + return NewErrorIterator( + Status::NotSupported("Managed iterator is not supported anymore.")); + } + if (read_options.read_tier == kPersistedTier) { + return NewErrorIterator(Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators.")); + } + + assert(column_family); + if (read_options.timestamp) { + const Status s = FailIfTsMismatchCf( + column_family, *(read_options.timestamp), /*ts_for_read=*/true); + if (!s.ok()) { + return NewErrorIterator(s); + } + } else { + const Status s = FailIfCfHasTs(column_family); + if (!s.ok()) { + return NewErrorIterator(s); + } + } + + Iterator* result = nullptr; + auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); + auto cfd = cfh->cfd(); + ReadCallback* read_callback = nullptr; // No read callback provided. + if (read_options.tailing) { + return NewErrorIterator(Status::NotSupported( + "tailing iterator not supported in secondary mode")); + } else if (read_options.snapshot != nullptr) { + // TODO (yanqin) support snapshot. + return NewErrorIterator( + Status::NotSupported("snapshot not supported in secondary mode")); + } else { + SequenceNumber snapshot(kMaxSequenceNumber); + result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); + } + return result; +} + +ArenaWrappedDBIter* DBImplSecondary::NewIteratorImpl( + const ReadOptions& read_options, ColumnFamilyData* cfd, + SequenceNumber snapshot, ReadCallback* read_callback, + bool expose_blob_index, bool allow_refresh) { + assert(nullptr != cfd); + SuperVersion* super_version = cfd->GetReferencedSuperVersion(this); + assert(snapshot == kMaxSequenceNumber); + snapshot = versions_->LastSequence(); + assert(snapshot != kMaxSequenceNumber); + auto db_iter = NewArenaWrappedDbIterator( + env_, read_options, *cfd->ioptions(), super_version->mutable_cf_options, + super_version->current, snapshot, + super_version->mutable_cf_options.max_sequential_skip_in_iterations, + super_version->version_number, read_callback, this, cfd, + expose_blob_index, read_options.snapshot ? false : allow_refresh); + auto internal_iter = NewInternalIterator( + db_iter->GetReadOptions(), cfd, super_version, db_iter->GetArena(), + snapshot, /* allow_unprepared_value */ true, db_iter); + db_iter->SetIterUnderDBIter(internal_iter); + return db_iter; +} + +Status DBImplSecondary::NewIterators( + const ReadOptions& read_options, + const std::vector<ColumnFamilyHandle*>& column_families, + std::vector<Iterator*>* iterators) { + if (read_options.managed) { + return Status::NotSupported("Managed iterator is not supported anymore."); + } + if (read_options.read_tier == kPersistedTier) { + return Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators."); + } + ReadCallback* read_callback = nullptr; // No read callback provided. + if (iterators == nullptr) { + return Status::InvalidArgument("iterators not allowed to be nullptr"); + } + + if (read_options.timestamp) { + for (auto* cf : column_families) { + assert(cf); + const Status s = FailIfTsMismatchCf(cf, *(read_options.timestamp), + /*ts_for_read=*/true); + if (!s.ok()) { + return s; + } + } + } else { + for (auto* cf : column_families) { + assert(cf); + const Status s = FailIfCfHasTs(cf); + if (!s.ok()) { + return s; + } + } + } + iterators->clear(); + iterators->reserve(column_families.size()); + if (read_options.tailing) { + return Status::NotSupported( + "tailing iterator not supported in secondary mode"); + } else if (read_options.snapshot != nullptr) { + // TODO (yanqin) support snapshot. + return Status::NotSupported("snapshot not supported in secondary mode"); + } else { + SequenceNumber read_seq(kMaxSequenceNumber); + for (auto cfh : column_families) { + ColumnFamilyData* cfd = static_cast<ColumnFamilyHandleImpl*>(cfh)->cfd(); + iterators->push_back( + NewIteratorImpl(read_options, cfd, read_seq, read_callback)); + } + } + return Status::OK(); +} + +Status DBImplSecondary::CheckConsistency() { + mutex_.AssertHeld(); + Status s = DBImpl::CheckConsistency(); + // If DBImpl::CheckConsistency() which is stricter returns success, then we + // do not need to give a second chance. + if (s.ok()) { + return s; + } + // It's possible that DBImpl::CheckConssitency() can fail because the primary + // may have removed certain files, causing the GetFileSize(name) call to + // fail and returning a PathNotFound. In this case, we take a best-effort + // approach and just proceed. + TEST_SYNC_POINT_CALLBACK( + "DBImplSecondary::CheckConsistency:AfterFirstAttempt", &s); + + if (immutable_db_options_.skip_checking_sst_file_sizes_on_db_open) { + return Status::OK(); + } + + std::vector<LiveFileMetaData> metadata; + versions_->GetLiveFilesMetaData(&metadata); + + std::string corruption_messages; + for (const auto& md : metadata) { + // md.name has a leading "/". + std::string file_path = md.db_path + md.name; + + uint64_t fsize = 0; + s = env_->GetFileSize(file_path, &fsize); + if (!s.ok() && + (env_->GetFileSize(Rocks2LevelTableFileName(file_path), &fsize).ok() || + s.IsPathNotFound())) { + s = Status::OK(); + } + if (!s.ok()) { + corruption_messages += + "Can't access " + md.name + ": " + s.ToString() + "\n"; + } + } + return corruption_messages.empty() ? Status::OK() + : Status::Corruption(corruption_messages); +} + +Status DBImplSecondary::TryCatchUpWithPrimary() { + assert(versions_.get() != nullptr); + assert(manifest_reader_.get() != nullptr); + Status s; + // read the manifest and apply new changes to the secondary instance + std::unordered_set<ColumnFamilyData*> cfds_changed; + JobContext job_context(0, true /*create_superversion*/); + { + InstrumentedMutexLock lock_guard(&mutex_); + s = static_cast_with_check<ReactiveVersionSet>(versions_.get()) + ->ReadAndApply(&mutex_, &manifest_reader_, + manifest_reader_status_.get(), &cfds_changed); + + ROCKS_LOG_INFO(immutable_db_options_.info_log, "Last sequence is %" PRIu64, + static_cast<uint64_t>(versions_->LastSequence())); + for (ColumnFamilyData* cfd : cfds_changed) { + if (cfd->IsDropped()) { + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "[%s] is dropped\n", + cfd->GetName().c_str()); + continue; + } + VersionStorageInfo::LevelSummaryStorage tmp; + ROCKS_LOG_DEBUG(immutable_db_options_.info_log, + "[%s] Level summary: %s\n", cfd->GetName().c_str(), + cfd->current()->storage_info()->LevelSummary(&tmp)); + } + + // list wal_dir to discover new WALs and apply new changes to the secondary + // instance + if (s.ok()) { + s = FindAndRecoverLogFiles(&cfds_changed, &job_context); + } + if (s.IsPathNotFound()) { + ROCKS_LOG_INFO( + immutable_db_options_.info_log, + "Secondary tries to read WAL, but WAL file(s) have already " + "been purged by primary."); + s = Status::OK(); + } + if (s.ok()) { + for (auto cfd : cfds_changed) { + cfd->imm()->RemoveOldMemTables(cfd->GetLogNumber(), + &job_context.memtables_to_free); + auto& sv_context = job_context.superversion_contexts.back(); + cfd->InstallSuperVersion(&sv_context, &mutex_); + sv_context.NewSuperVersion(); + } + } + } + job_context.Clean(); + + // Cleanup unused, obsolete files. + JobContext purge_files_job_context(0); + { + InstrumentedMutexLock lock_guard(&mutex_); + // Currently, secondary instance does not own the database files, thus it + // is unnecessary for the secondary to force full scan. + FindObsoleteFiles(&purge_files_job_context, /*force=*/false); + } + if (purge_files_job_context.HaveSomethingToDelete()) { + PurgeObsoleteFiles(purge_files_job_context); + } + purge_files_job_context.Clean(); + return s; +} + +Status DB::OpenAsSecondary(const Options& options, const std::string& dbname, + const std::string& secondary_path, DB** dbptr) { + *dbptr = nullptr; + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector<ColumnFamilyDescriptor> column_families; + column_families.emplace_back(kDefaultColumnFamilyName, cf_options); + std::vector<ColumnFamilyHandle*> handles; + + Status s = DB::OpenAsSecondary(db_options, dbname, secondary_path, + column_families, &handles, dbptr); + if (s.ok()) { + assert(handles.size() == 1); + delete handles[0]; + } + return s; +} + +Status DB::OpenAsSecondary( + const DBOptions& db_options, const std::string& dbname, + const std::string& secondary_path, + const std::vector<ColumnFamilyDescriptor>& column_families, + std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) { + *dbptr = nullptr; + + DBOptions tmp_opts(db_options); + Status s; + if (nullptr == tmp_opts.info_log) { + s = CreateLoggerFromOptions(secondary_path, tmp_opts, &tmp_opts.info_log); + if (!s.ok()) { + tmp_opts.info_log = nullptr; + return s; + } + } + + assert(tmp_opts.info_log != nullptr); + if (db_options.max_open_files != -1) { + std::ostringstream oss; + oss << "The primary instance may delete all types of files after they " + "become obsolete. The application can coordinate the primary and " + "secondary so that primary does not delete/rename files that are " + "currently being used by the secondary. Alternatively, a custom " + "Env/FS can be provided such that files become inaccessible only " + "after all primary and secondaries indicate that they are obsolete " + "and deleted. If the above two are not possible, you can open the " + "secondary instance with `max_open_files==-1` so that secondary " + "will eagerly keep all table files open. Even if a file is deleted, " + "its content can still be accessed via a prior open file " + "descriptor. This is a hacky workaround for only table files. If " + "none of the above is done, then point lookup or " + "range scan via the secondary instance can result in IOError: file " + "not found. This can be resolved by retrying " + "TryCatchUpWithPrimary()."; + ROCKS_LOG_WARN(tmp_opts.info_log, "%s", oss.str().c_str()); + } + + handles->clear(); + DBImplSecondary* impl = new DBImplSecondary(tmp_opts, dbname, secondary_path); + impl->versions_.reset(new ReactiveVersionSet( + dbname, &impl->immutable_db_options_, impl->file_options_, + impl->table_cache_.get(), impl->write_buffer_manager_, + &impl->write_controller_, impl->io_tracer_)); + impl->column_family_memtables_.reset( + new ColumnFamilyMemTablesImpl(impl->versions_->GetColumnFamilySet())); + impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath(); + + impl->mutex_.Lock(); + s = impl->Recover(column_families, true, false, false); + if (s.ok()) { + for (auto cf : column_families) { + auto cfd = + impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name); + if (nullptr == cfd) { + s = Status::InvalidArgument("Column family not found", cf.name); + break; + } + handles->push_back(new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_)); + } + } + SuperVersionContext sv_context(true /* create_superversion */); + if (s.ok()) { + for (auto cfd : *impl->versions_->GetColumnFamilySet()) { + sv_context.NewSuperVersion(); + cfd->InstallSuperVersion(&sv_context, &impl->mutex_); + } + } + impl->mutex_.Unlock(); + sv_context.Clean(); + if (s.ok()) { + *dbptr = impl; + for (auto h : *handles) { + impl->NewThreadStatusCfInfo( + static_cast_with_check<ColumnFamilyHandleImpl>(h)->cfd()); + } + } else { + for (auto h : *handles) { + delete h; + } + handles->clear(); + delete impl; + } + return s; +} + +Status DBImplSecondary::CompactWithoutInstallation( + const OpenAndCompactOptions& options, ColumnFamilyHandle* cfh, + const CompactionServiceInput& input, CompactionServiceResult* result) { + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } + InstrumentedMutexLock l(&mutex_); + auto cfd = static_cast_with_check<ColumnFamilyHandleImpl>(cfh)->cfd(); + if (!cfd) { + return Status::InvalidArgument("Cannot find column family" + + cfh->GetName()); + } + + std::unordered_set<uint64_t> input_set; + for (const auto& file_name : input.input_files) { + input_set.insert(TableFileNameToNumber(file_name)); + } + + auto* version = cfd->current(); + + ColumnFamilyMetaData cf_meta; + version->GetColumnFamilyMetaData(&cf_meta); + + const MutableCFOptions* mutable_cf_options = cfd->GetLatestMutableCFOptions(); + ColumnFamilyOptions cf_options = cfd->GetLatestCFOptions(); + VersionStorageInfo* vstorage = version->storage_info(); + + // Use comp_options to reuse some CompactFiles functions + CompactionOptions comp_options; + comp_options.compression = kDisableCompressionOption; + comp_options.output_file_size_limit = MaxFileSizeForLevel( + *mutable_cf_options, input.output_level, cf_options.compaction_style, + vstorage->base_level(), cf_options.level_compaction_dynamic_level_bytes); + + std::vector<CompactionInputFiles> input_files; + Status s = cfd->compaction_picker()->GetCompactionInputsFromFileNumbers( + &input_files, &input_set, vstorage, comp_options); + if (!s.ok()) { + return s; + } + + std::unique_ptr<Compaction> c; + assert(cfd->compaction_picker()); + c.reset(cfd->compaction_picker()->CompactFiles( + comp_options, input_files, input.output_level, vstorage, + *mutable_cf_options, mutable_db_options_, 0)); + assert(c != nullptr); + + c->SetInputVersion(version); + + // Create output directory if it's not existed yet + std::unique_ptr<FSDirectory> output_dir; + s = CreateAndNewDirectory(fs_.get(), secondary_path_, &output_dir); + if (!s.ok()) { + return s; + } + + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, + immutable_db_options_.info_log.get()); + + const int job_id = next_job_id_.fetch_add(1); + + // use primary host's db_id for running the compaction, but db_session_id is + // using the local one, which is to make sure the unique id is unique from + // the remote compactors. Because the id is generated from db_id, + // db_session_id and orig_file_number, unlike the local compaction, remote + // compaction cannot guarantee the uniqueness of orig_file_number, the file + // number is only assigned when compaction is done. + CompactionServiceCompactionJob compaction_job( + job_id, c.get(), immutable_db_options_, mutable_db_options_, + file_options_for_compaction_, versions_.get(), &shutting_down_, + &log_buffer, output_dir.get(), stats_, &mutex_, &error_handler_, + input.snapshots, table_cache_, &event_logger_, dbname_, io_tracer_, + options.canceled ? *options.canceled : kManualCompactionCanceledFalse_, + input.db_id, db_session_id_, secondary_path_, input, result); + + mutex_.Unlock(); + s = compaction_job.Run(); + mutex_.Lock(); + + // clean up + compaction_job.io_status().PermitUncheckedError(); + compaction_job.CleanupCompaction(); + c->ReleaseCompactionFiles(s); + c.reset(); + + TEST_SYNC_POINT_CALLBACK("DBImplSecondary::CompactWithoutInstallation::End", + &s); + result->status = s; + return s; +} + +Status DB::OpenAndCompact( + const OpenAndCompactOptions& options, const std::string& name, + const std::string& output_directory, const std::string& input, + std::string* output, + const CompactionServiceOptionsOverride& override_options) { + if (options.canceled && options.canceled->load(std::memory_order_acquire)) { + return Status::Incomplete(Status::SubCode::kManualCompactionPaused); + } + CompactionServiceInput compaction_input; + Status s = CompactionServiceInput::Read(input, &compaction_input); + if (!s.ok()) { + return s; + } + + compaction_input.db_options.max_open_files = -1; + compaction_input.db_options.compaction_service = nullptr; + if (compaction_input.db_options.statistics) { + compaction_input.db_options.statistics.reset(); + } + compaction_input.db_options.env = override_options.env; + compaction_input.db_options.file_checksum_gen_factory = + override_options.file_checksum_gen_factory; + compaction_input.db_options.statistics = override_options.statistics; + compaction_input.column_family.options.comparator = + override_options.comparator; + compaction_input.column_family.options.merge_operator = + override_options.merge_operator; + compaction_input.column_family.options.compaction_filter = + override_options.compaction_filter; + compaction_input.column_family.options.compaction_filter_factory = + override_options.compaction_filter_factory; + compaction_input.column_family.options.prefix_extractor = + override_options.prefix_extractor; + compaction_input.column_family.options.table_factory = + override_options.table_factory; + compaction_input.column_family.options.sst_partitioner_factory = + override_options.sst_partitioner_factory; + compaction_input.column_family.options.table_properties_collector_factories = + override_options.table_properties_collector_factories; + compaction_input.db_options.listeners = override_options.listeners; + + std::vector<ColumnFamilyDescriptor> column_families; + column_families.push_back(compaction_input.column_family); + // TODO: we have to open default CF, because of an implementation limitation, + // currently we just use the same CF option from input, which is not collect + // and open may fail. + if (compaction_input.column_family.name != kDefaultColumnFamilyName) { + column_families.emplace_back(kDefaultColumnFamilyName, + compaction_input.column_family.options); + } + + DB* db; + std::vector<ColumnFamilyHandle*> handles; + + s = DB::OpenAsSecondary(compaction_input.db_options, name, output_directory, + column_families, &handles, &db); + if (!s.ok()) { + return s; + } + + CompactionServiceResult compaction_result; + DBImplSecondary* db_secondary = static_cast_with_check<DBImplSecondary>(db); + assert(handles.size() > 0); + s = db_secondary->CompactWithoutInstallation( + options, handles[0], compaction_input, &compaction_result); + + Status serialization_status = compaction_result.Write(output); + + for (auto& handle : handles) { + delete handle; + } + delete db; + if (s.ok()) { + return serialization_status; + } + return s; +} + +Status DB::OpenAndCompact( + const std::string& name, const std::string& output_directory, + const std::string& input, std::string* output, + const CompactionServiceOptionsOverride& override_options) { + return OpenAndCompact(OpenAndCompactOptions(), name, output_directory, input, + output, override_options); +} + +#else // !ROCKSDB_LITE + +Status DB::OpenAsSecondary(const Options& /*options*/, + const std::string& /*name*/, + const std::string& /*secondary_path*/, + DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} + +Status DB::OpenAsSecondary( + const DBOptions& /*db_options*/, const std::string& /*dbname*/, + const std::string& /*secondary_path*/, + const std::vector<ColumnFamilyDescriptor>& /*column_families*/, + std::vector<ColumnFamilyHandle*>* /*handles*/, DB** /*dbptr*/) { + return Status::NotSupported("Not supported in ROCKSDB_LITE."); +} +#endif // !ROCKSDB_LITE + +} // namespace ROCKSDB_NAMESPACE |