summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/db_impl/db_impl_secondary.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/db/db_impl/db_impl_secondary.h')
-rw-r--r--src/rocksdb/db/db_impl/db_impl_secondary.h333
1 files changed, 333 insertions, 0 deletions
diff --git a/src/rocksdb/db/db_impl/db_impl_secondary.h b/src/rocksdb/db/db_impl/db_impl_secondary.h
new file mode 100644
index 000000000..24f2e7767
--- /dev/null
+++ b/src/rocksdb/db/db_impl/db_impl_secondary.h
@@ -0,0 +1,333 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+
+#include <string>
+#include <vector>
+#include "db/db_impl/db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// A wrapper class to hold log reader, log reporter, log status.
+class LogReaderContainer {
+ public:
+ LogReaderContainer()
+ : reader_(nullptr), reporter_(nullptr), status_(nullptr) {}
+ LogReaderContainer(Env* env, std::shared_ptr<Logger> info_log,
+ std::string fname,
+ std::unique_ptr<SequentialFileReader>&& file_reader,
+ uint64_t log_number) {
+ LogReporter* reporter = new LogReporter();
+ status_ = new Status();
+ reporter->env = env;
+ reporter->info_log = info_log.get();
+ reporter->fname = std::move(fname);
+ reporter->status = status_;
+ reporter_ = reporter;
+ // We intentially make log::Reader do checksumming even if
+ // paranoid_checks==false so that corruptions cause entire commits
+ // to be skipped instead of propagating bad information (like overly
+ // large sequence numbers).
+ reader_ = new log::FragmentBufferedReader(info_log, std::move(file_reader),
+ reporter, true /*checksum*/,
+ log_number);
+ }
+ log::FragmentBufferedReader* reader_;
+ log::Reader::Reporter* reporter_;
+ Status* status_;
+ ~LogReaderContainer() {
+ delete reader_;
+ delete reporter_;
+ delete status_;
+ }
+ private:
+ struct LogReporter : public log::Reader::Reporter {
+ Env* env;
+ Logger* info_log;
+ std::string fname;
+ Status* status; // nullptr if immutable_db_options_.paranoid_checks==false
+ void Corruption(size_t bytes, const Status& s) override {
+ ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
+ (this->status == nullptr ? "(ignoring error) " : ""),
+ fname.c_str(), static_cast<int>(bytes),
+ s.ToString().c_str());
+ if (this->status != nullptr && this->status->ok()) {
+ *this->status = s;
+ }
+ }
+ };
+};
+
+// The secondary instance shares access to the storage as the primary.
+// The secondary is able to read and replay changes described in both the
+// MANIFEST and the WAL files without coordination with the primary.
+// The secondary instance can be opened using `DB::OpenAsSecondary`. After
+// that, it can call `DBImplSecondary::TryCatchUpWithPrimary` to make best
+// effort attempts to catch up with the primary.
+class DBImplSecondary : public DBImpl {
+ public:
+ DBImplSecondary(const DBOptions& options, const std::string& dbname);
+ ~DBImplSecondary() override;
+
+ // Recover by replaying MANIFEST and WAL. Also initialize manifest_reader_
+ // and log_readers_ to facilitate future operations.
+ Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families,
+ bool read_only, bool error_if_log_file_exist,
+ bool error_if_data_exists_in_logs,
+ uint64_t* = nullptr) override;
+
+ // Implementations of the DB interface
+ using DB::Get;
+ Status Get(const ReadOptions& options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value) override;
+
+ Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
+ const Slice& key, PinnableSlice* value);
+
+ using DBImpl::NewIterator;
+ Iterator* NewIterator(const ReadOptions&,
+ ColumnFamilyHandle* column_family) override;
+
+ ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& read_options,
+ ColumnFamilyData* cfd,
+ SequenceNumber snapshot,
+ ReadCallback* read_callback);
+
+ Status NewIterators(const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_families,
+ std::vector<Iterator*>* iterators) override;
+
+ using DBImpl::Put;
+ Status Put(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::Merge;
+ Status Merge(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,
+ const Slice& /*value*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::Delete;
+ Status Delete(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::SingleDelete;
+ Status SingleDelete(const WriteOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice& /*key*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ Status Write(const WriteOptions& /*options*/,
+ WriteBatch* /*updates*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::CompactRange;
+ Status CompactRange(const CompactRangeOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const Slice* /*begin*/, const Slice* /*end*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::CompactFiles;
+ Status CompactFiles(
+ const CompactionOptions& /*compact_options*/,
+ ColumnFamilyHandle* /*column_family*/,
+ const std::vector<std::string>& /*input_file_names*/,
+ const int /*output_level*/, const int /*output_path_id*/ = -1,
+ std::vector<std::string>* const /*output_file_names*/ = nullptr,
+ CompactionJobInfo* /*compaction_job_info*/ = nullptr) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ Status DisableFileDeletions() override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ Status EnableFileDeletions(bool /*force*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ Status GetLiveFiles(std::vector<std::string>&,
+ uint64_t* /*manifest_file_size*/,
+ bool /*flush_memtable*/ = true) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::Flush;
+ Status Flush(const FlushOptions& /*options*/,
+ ColumnFamilyHandle* /*column_family*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::SetDBOptions;
+ Status SetDBOptions(const std::unordered_map<std::string, std::string>&
+ /*options_map*/) override {
+ // Currently not supported because changing certain options may cause
+ // flush/compaction.
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::SetOptions;
+ Status SetOptions(
+ ColumnFamilyHandle* /*cfd*/,
+ const std::unordered_map<std::string, std::string>& /*options_map*/)
+ override {
+ // Currently not supported because changing certain options may cause
+ // flush/compaction and/or write to MANIFEST.
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DBImpl::SyncWAL;
+ Status SyncWAL() override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ using DB::IngestExternalFile;
+ Status IngestExternalFile(
+ ColumnFamilyHandle* /*column_family*/,
+ const std::vector<std::string>& /*external_files*/,
+ const IngestExternalFileOptions& /*ingestion_options*/) override {
+ return Status::NotSupported("Not supported operation in secondary mode.");
+ }
+
+ // Try to catch up with the primary by reading as much as possible from the
+ // log files until there is nothing more to read or encounters an error. If
+ // the amount of information in the log files to process is huge, this
+ // method can take long time due to all the I/O and CPU costs.
+ Status TryCatchUpWithPrimary() override;
+
+
+ // Try to find log reader using log_number from log_readers_ map, initialize
+ // if it doesn't exist
+ Status MaybeInitLogReader(uint64_t log_number,
+ log::FragmentBufferedReader** log_reader);
+
+ // Check if all live files exist on file system and that their file sizes
+ // matche to the in-memory records. It is possible that some live files may
+ // have been deleted by the primary. In this case, CheckConsistency() does
+ // not flag the missing file as inconsistency.
+ Status CheckConsistency() override;
+
+ protected:
+ // ColumnFamilyCollector is a write batch handler which does nothing
+ // except recording unique column family IDs
+ class ColumnFamilyCollector : public WriteBatch::Handler {
+ std::unordered_set<uint32_t> column_family_ids_;
+
+ Status AddColumnFamilyId(uint32_t column_family_id) {
+ if (column_family_ids_.find(column_family_id) ==
+ column_family_ids_.end()) {
+ column_family_ids_.insert(column_family_id);
+ }
+ return Status::OK();
+ }
+
+ public:
+ explicit ColumnFamilyCollector() {}
+
+ ~ColumnFamilyCollector() override {}
+
+ Status PutCF(uint32_t column_family_id, const Slice&,
+ const Slice&) override {
+ return AddColumnFamilyId(column_family_id);
+ }
+
+ Status DeleteCF(uint32_t column_family_id, const Slice&) override {
+ return AddColumnFamilyId(column_family_id);
+ }
+
+ Status SingleDeleteCF(uint32_t column_family_id, const Slice&) override {
+ return AddColumnFamilyId(column_family_id);
+ }
+
+ Status DeleteRangeCF(uint32_t column_family_id, const Slice&,
+ const Slice&) override {
+ return AddColumnFamilyId(column_family_id);
+ }
+
+ Status MergeCF(uint32_t column_family_id, const Slice&,
+ const Slice&) override {
+ return AddColumnFamilyId(column_family_id);
+ }
+
+ Status PutBlobIndexCF(uint32_t column_family_id, const Slice&,
+ const Slice&) override {
+ return AddColumnFamilyId(column_family_id);
+ }
+
+ const std::unordered_set<uint32_t>& column_families() const {
+ return column_family_ids_;
+ }
+ };
+
+ Status CollectColumnFamilyIdsFromWriteBatch(
+ const WriteBatch& batch, std::vector<uint32_t>* column_family_ids) {
+ assert(column_family_ids != nullptr);
+ column_family_ids->clear();
+ ColumnFamilyCollector handler;
+ Status s = batch.Iterate(&handler);
+ if (s.ok()) {
+ for (const auto& cf : handler.column_families()) {
+ column_family_ids->push_back(cf);
+ }
+ }
+ return s;
+ }
+
+ bool OwnTablesAndLogs() const override {
+ // Currently, the secondary instance does not own the database files. It
+ // simply opens the files of the primary instance and tracks their file
+ // descriptors until they become obsolete. In the future, the secondary may
+ // create links to database files. OwnTablesAndLogs will return true then.
+ return false;
+ }
+
+ private:
+ friend class DB;
+
+ // No copying allowed
+ DBImplSecondary(const DBImplSecondary&);
+ void operator=(const DBImplSecondary&);
+
+ using DBImpl::Recover;
+
+ Status FindAndRecoverLogFiles(
+ std::unordered_set<ColumnFamilyData*>* cfds_changed,
+ JobContext* job_context);
+ Status FindNewLogNumbers(std::vector<uint64_t>* logs);
+ // After manifest recovery, replay WALs and refresh log_readers_ if necessary
+ // REQUIRES: log_numbers are sorted in ascending order
+ Status RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
+ SequenceNumber* next_sequence,
+ std::unordered_set<ColumnFamilyData*>* cfds_changed,
+ JobContext* job_context);
+
+ std::unique_ptr<log::FragmentBufferedReader> manifest_reader_;
+ std::unique_ptr<log::Reader::Reporter> manifest_reporter_;
+ std::unique_ptr<Status> manifest_reader_status_;
+
+ // Cache log readers for each log number, used for continue WAL replay
+ // after recovery
+ std::map<uint64_t, std::unique_ptr<LogReaderContainer>> log_readers_;
+
+ // Current WAL number replayed for each column family.
+ std::unordered_map<ColumnFamilyData*, uint64_t> cfd_to_current_log_;
+};
+
+} // namespace ROCKSDB_NAMESPACE
+
+#endif // !ROCKSDB_LITE