summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/db/compaction/compaction_job.h
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/db/compaction/compaction_job.h500
1 files changed, 500 insertions, 0 deletions
diff --git a/src/rocksdb/db/compaction/compaction_job.h b/src/rocksdb/db/compaction/compaction_job.h
new file mode 100644
index 000000000..bfbce1011
--- /dev/null
+++ b/src/rocksdb/db/compaction/compaction_job.h
@@ -0,0 +1,500 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+
+#include <atomic>
+#include <deque>
+#include <functional>
+#include <limits>
+#include <set>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "db/blob/blob_file_completion_callback.h"
+#include "db/column_family.h"
+#include "db/compaction/compaction_iterator.h"
+#include "db/compaction/compaction_outputs.h"
+#include "db/flush_scheduler.h"
+#include "db/internal_stats.h"
+#include "db/job_context.h"
+#include "db/log_writer.h"
+#include "db/memtable_list.h"
+#include "db/range_del_aggregator.h"
+#include "db/seqno_to_time_mapping.h"
+#include "db/version_edit.h"
+#include "db/write_controller.h"
+#include "db/write_thread.h"
+#include "logging/event_logger.h"
+#include "options/cf_options.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/compaction_job_stats.h"
+#include "rocksdb/db.h"
+#include "rocksdb/env.h"
+#include "rocksdb/memtablerep.h"
+#include "rocksdb/transaction_log.h"
+#include "table/scoped_arena_iterator.h"
+#include "util/autovector.h"
+#include "util/stop_watch.h"
+#include "util/thread_local.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+class Arena;
+class CompactionState;
+class ErrorHandler;
+class MemTable;
+class SnapshotChecker;
+class SystemClock;
+class TableCache;
+class Version;
+class VersionEdit;
+class VersionSet;
+
+class SubcompactionState;
+
+// CompactionJob is responsible for executing the compaction. Each (manual or
+// automated) compaction corresponds to a CompactionJob object, and usually
+// goes through the stages of `Prepare()`->`Run()`->`Install()`. CompactionJob
+// will divide the compaction into subcompactions and execute them in parallel
+// if needed.
+//
+// CompactionJob has 2 main stats:
+// 1. CompactionJobStats compaction_job_stats_
+// CompactionJobStats is a public data structure which is part of Compaction
+// event listener that rocksdb share the job stats with the user.
+// Internally it's an aggregation of all the compaction_job_stats from each
+// `SubcompactionState`:
+// +------------------------+
+// | SubcompactionState |
+// | |
+// +--------->| compaction_job_stats |
+// | | |
+// | +------------------------+
+// +------------------------+ |
+// | CompactionJob | | +------------------------+
+// | | | | SubcompactionState |
+// | compaction_job_stats +-----+ | |
+// | | +--------->| compaction_job_stats |
+// | | | | |
+// +------------------------+ | +------------------------+
+// |
+// | +------------------------+
+// | | SubcompactionState |
+// | | |
+// +--------->+ compaction_job_stats |
+// | | |
+// | +------------------------+
+// |
+// | +------------------------+
+// | | ... |
+// +--------->+ |
+// +------------------------+
+//
+// 2. CompactionStatsFull compaction_stats_
+// `CompactionStatsFull` is an internal stats about the compaction, which
+// is eventually sent to `ColumnFamilyData::internal_stats_` and used for
+// logging and public metrics.
+// Internally, it's an aggregation of stats_ from each `SubcompactionState`.
+// It has 2 parts, normal stats about the main compaction information and
+// the penultimate level output stats.
+// `SubcompactionState` maintains the CompactionOutputs for normal output and
+// the penultimate level output if exists, the per_level stats is
+// stored with the outputs.
+// +---------------------------+
+// | SubcompactionState |
+// | |
+// | +----------------------+ |
+// | | CompactionOutputs | |
+// | | (normal output) | |
+// +---->| stats_ | |
+// | | +----------------------+ |
+// | | |
+// | | +----------------------+ |
+// +--------------------------------+ | | | CompactionOutputs | |
+// | CompactionJob | | | | (penultimate_level) | |
+// | | +--------->| stats_ | |
+// | compaction_stats_ | | | | +----------------------+ |
+// | +-------------------------+ | | | | |
+// | |stats (normal) |------|----+ +---------------------------+
+// | +-------------------------+ | | |
+// | | | |
+// | +-------------------------+ | | | +---------------------------+
+// | |penultimate_level_stats +------+ | | SubcompactionState |
+// | +-------------------------+ | | | | |
+// | | | | | +----------------------+ |
+// | | | | | | CompactionOutputs | |
+// +--------------------------------+ | | | | (normal output) | |
+// | +---->| stats_ | |
+// | | +----------------------+ |
+// | | |
+// | | +----------------------+ |
+// | | | CompactionOutputs | |
+// | | | (penultimate_level) | |
+// +--------->| stats_ | |
+// | +----------------------+ |
+// | |
+// +---------------------------+
+
+class CompactionJob {
+ public:
+ CompactionJob(
+ int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
+ const MutableDBOptions& mutable_db_options,
+ const FileOptions& file_options, VersionSet* versions,
+ const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
+ FSDirectory* db_directory, FSDirectory* output_directory,
+ FSDirectory* blob_output_directory, Statistics* stats,
+ InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
+ std::vector<SequenceNumber> existing_snapshots,
+ SequenceNumber earliest_write_conflict_snapshot,
+ const SnapshotChecker* snapshot_checker, JobContext* job_context,
+ std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
+ bool paranoid_file_checks, bool measure_io_stats,
+ const std::string& dbname, CompactionJobStats* compaction_job_stats,
+ Env::Priority thread_pri, const std::shared_ptr<IOTracer>& io_tracer,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const std::string& db_id = "", const std::string& db_session_id = "",
+ std::string full_history_ts_low = "", std::string trim_ts = "",
+ BlobFileCompletionCallback* blob_callback = nullptr,
+ int* bg_compaction_scheduled = nullptr,
+ int* bg_bottom_compaction_scheduled = nullptr);
+
+ virtual ~CompactionJob();
+
+ // no copy/move
+ CompactionJob(CompactionJob&& job) = delete;
+ CompactionJob(const CompactionJob& job) = delete;
+ CompactionJob& operator=(const CompactionJob& job) = delete;
+
+ // REQUIRED: mutex held
+ // Prepare for the compaction by setting up boundaries for each subcompaction
+ void Prepare();
+ // REQUIRED mutex not held
+ // Launch threads for each subcompaction and wait for them to finish. After
+ // that, verify table is usable and finally do bookkeeping to unify
+ // subcompaction results
+ Status Run();
+
+ // REQUIRED: mutex held
+ // Add compaction input/output to the current version
+ Status Install(const MutableCFOptions& mutable_cf_options);
+
+ // Return the IO status
+ IOStatus io_status() const { return io_status_; }
+
+ protected:
+ void UpdateCompactionStats();
+ void LogCompaction();
+ virtual void RecordCompactionIOStats();
+ void CleanupCompaction();
+
+ // Call compaction filter. Then iterate through input and compact the
+ // kv-pairs
+ void ProcessKeyValueCompaction(SubcompactionState* sub_compact);
+
+ CompactionState* compact_;
+ InternalStats::CompactionStatsFull compaction_stats_;
+ const ImmutableDBOptions& db_options_;
+ const MutableDBOptions mutable_db_options_copy_;
+ LogBuffer* log_buffer_;
+ FSDirectory* output_directory_;
+ Statistics* stats_;
+ // Is this compaction creating a file in the bottom most level?
+ bool bottommost_level_;
+
+ Env::WriteLifeTimeHint write_hint_;
+
+ IOStatus io_status_;
+
+ CompactionJobStats* compaction_job_stats_;
+
+ private:
+ friend class CompactionJobTestBase;
+
+ // Generates a histogram representing potential divisions of key ranges from
+ // the input. It adds the starting and/or ending keys of certain input files
+ // to the working set and then finds the approximate size of data in between
+ // each consecutive pair of slices. Then it divides these ranges into
+ // consecutive groups such that each group has a similar size.
+ void GenSubcompactionBoundaries();
+
+ // Get the number of planned subcompactions based on max_subcompactions and
+ // extra reserved resources
+ uint64_t GetSubcompactionsLimit();
+
+ // Additional reserved threads are reserved and the number is stored in
+ // extra_num_subcompaction_threads_reserved__. For now, this happens only if
+ // the compaction priority is round-robin and max_subcompactions is not
+ // sufficient (extra resources may be needed)
+ void AcquireSubcompactionResources(int num_extra_required_subcompactions);
+
+ // Additional threads may be reserved during IncreaseSubcompactionResources()
+ // if num_actual_subcompactions is less than num_planned_subcompactions.
+ // Additional threads will be released and the bg_compaction_scheduled_ or
+ // bg_bottom_compaction_scheduled_ will be updated if they are used.
+ // DB Mutex lock is required.
+ void ShrinkSubcompactionResources(uint64_t num_extra_resources);
+
+ // Release all reserved threads and update the compaction limits.
+ void ReleaseSubcompactionResources();
+
+ CompactionServiceJobStatus ProcessKeyValueCompactionWithCompactionService(
+ SubcompactionState* sub_compact);
+
+ // update the thread status for starting a compaction.
+ void ReportStartedCompaction(Compaction* compaction);
+
+ Status FinishCompactionOutputFile(const Status& input_status,
+ SubcompactionState* sub_compact,
+ CompactionOutputs& outputs,
+ const Slice& next_table_min_key);
+ Status InstallCompactionResults(const MutableCFOptions& mutable_cf_options);
+ Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
+ CompactionOutputs& outputs);
+ void UpdateCompactionJobStats(
+ const InternalStats::CompactionStats& stats) const;
+ void RecordDroppedKeys(const CompactionIterationStats& c_iter_stats,
+ CompactionJobStats* compaction_job_stats = nullptr);
+
+ void UpdateCompactionInputStatsHelper(int* num_files, uint64_t* bytes_read,
+ int input_level);
+
+ void NotifyOnSubcompactionBegin(SubcompactionState* sub_compact);
+
+ void NotifyOnSubcompactionCompleted(SubcompactionState* sub_compact);
+
+ uint32_t job_id_;
+
+ // DBImpl state
+ const std::string& dbname_;
+ const std::string db_id_;
+ const std::string db_session_id_;
+ const FileOptions file_options_;
+
+ Env* env_;
+ std::shared_ptr<IOTracer> io_tracer_;
+ FileSystemPtr fs_;
+ // env_option optimized for compaction table reads
+ FileOptions file_options_for_read_;
+ VersionSet* versions_;
+ const std::atomic<bool>* shutting_down_;
+ const std::atomic<bool>& manual_compaction_canceled_;
+ FSDirectory* db_directory_;
+ FSDirectory* blob_output_directory_;
+ InstrumentedMutex* db_mutex_;
+ ErrorHandler* db_error_handler_;
+ // If there were two snapshots with seq numbers s1 and
+ // s2 and s1 < s2, and if we find two instances of a key k1 then lies
+ // entirely within s1 and s2, then the earlier version of k1 can be safely
+ // deleted because that version is not visible in any snapshot.
+ std::vector<SequenceNumber> existing_snapshots_;
+
+ // This is the earliest snapshot that could be used for write-conflict
+ // checking by a transaction. For any user-key newer than this snapshot, we
+ // should make sure not to remove evidence that a write occurred.
+ SequenceNumber earliest_write_conflict_snapshot_;
+
+ const SnapshotChecker* const snapshot_checker_;
+
+ JobContext* job_context_;
+
+ std::shared_ptr<Cache> table_cache_;
+
+ EventLogger* event_logger_;
+
+ bool paranoid_file_checks_;
+ bool measure_io_stats_;
+ // Stores the Slices that designate the boundaries for each subcompaction
+ std::vector<std::string> boundaries_;
+ Env::Priority thread_pri_;
+ std::string full_history_ts_low_;
+ std::string trim_ts_;
+ BlobFileCompletionCallback* blob_callback_;
+
+ uint64_t GetCompactionId(SubcompactionState* sub_compact) const;
+ // Stores the number of reserved threads in shared env_ for the number of
+ // extra subcompaction in kRoundRobin compaction priority
+ int extra_num_subcompaction_threads_reserved_;
+
+ // Stores the pointer to bg_compaction_scheduled_,
+ // bg_bottom_compaction_scheduled_ in DBImpl. Mutex is required when accessing
+ // or updating it.
+ int* bg_compaction_scheduled_;
+ int* bg_bottom_compaction_scheduled_;
+
+ // Stores the sequence number to time mapping gathered from all input files
+ // it also collects the smallest_seqno -> oldest_ancester_time from the SST.
+ SeqnoToTimeMapping seqno_time_mapping_;
+
+ // Minimal sequence number for preserving the time information. The time info
+ // older than this sequence number won't be preserved after the compaction and
+ // if it's bottommost compaction, the seq num will be zeroed out.
+ SequenceNumber preserve_time_min_seqno_ = kMaxSequenceNumber;
+
+ // Minimal sequence number to preclude the data from the last level. If the
+ // key has bigger (newer) sequence number than this, it will be precluded from
+ // the last level (output to penultimate level).
+ SequenceNumber preclude_last_level_min_seqno_ = kMaxSequenceNumber;
+
+ // Get table file name in where it's outputting to, which should also be in
+ // `output_directory_`.
+ virtual std::string GetTableFileName(uint64_t file_number);
+ // The rate limiter priority (io_priority) is determined dynamically here.
+ // The Compaction Read and Write priorities are the same for different
+ // scenarios, such as write stalled.
+ Env::IOPriority GetRateLimiterPriority();
+};
+
+// CompactionServiceInput is used the pass compaction information between two
+// db instances. It contains the information needed to do a compaction. It
+// doesn't contain the LSM tree information, which is passed though MANIFEST
+// file.
+struct CompactionServiceInput {
+ ColumnFamilyDescriptor column_family;
+
+ DBOptions db_options;
+
+ std::vector<SequenceNumber> snapshots;
+
+ // SST files for compaction, it should already be expended to include all the
+ // files needed for this compaction, for both input level files and output
+ // level files.
+ std::vector<std::string> input_files;
+ int output_level;
+
+ // db_id is used to generate unique id of sst on the remote compactor
+ std::string db_id;
+
+ // information for subcompaction
+ bool has_begin = false;
+ std::string begin;
+ bool has_end = false;
+ std::string end;
+
+ // serialization interface to read and write the object
+ static Status Read(const std::string& data_str, CompactionServiceInput* obj);
+ Status Write(std::string* output);
+
+ // Initialize a dummy ColumnFamilyDescriptor
+ CompactionServiceInput() : column_family("", ColumnFamilyOptions()) {}
+
+#ifndef NDEBUG
+ bool TEST_Equals(CompactionServiceInput* other);
+ bool TEST_Equals(CompactionServiceInput* other, std::string* mismatch);
+#endif // NDEBUG
+};
+
+// CompactionServiceOutputFile is the metadata for the output SST file
+struct CompactionServiceOutputFile {
+ std::string file_name;
+ SequenceNumber smallest_seqno;
+ SequenceNumber largest_seqno;
+ std::string smallest_internal_key;
+ std::string largest_internal_key;
+ uint64_t oldest_ancester_time;
+ uint64_t file_creation_time;
+ uint64_t paranoid_hash;
+ bool marked_for_compaction;
+ UniqueId64x2 unique_id;
+
+ CompactionServiceOutputFile() = default;
+ CompactionServiceOutputFile(
+ const std::string& name, SequenceNumber smallest, SequenceNumber largest,
+ std::string _smallest_internal_key, std::string _largest_internal_key,
+ uint64_t _oldest_ancester_time, uint64_t _file_creation_time,
+ uint64_t _paranoid_hash, bool _marked_for_compaction,
+ UniqueId64x2 _unique_id)
+ : file_name(name),
+ smallest_seqno(smallest),
+ largest_seqno(largest),
+ smallest_internal_key(std::move(_smallest_internal_key)),
+ largest_internal_key(std::move(_largest_internal_key)),
+ oldest_ancester_time(_oldest_ancester_time),
+ file_creation_time(_file_creation_time),
+ paranoid_hash(_paranoid_hash),
+ marked_for_compaction(_marked_for_compaction),
+ unique_id(std::move(_unique_id)) {}
+};
+
+// CompactionServiceResult contains the compaction result from a different db
+// instance, with these information, the primary db instance with write
+// permission is able to install the result to the DB.
+struct CompactionServiceResult {
+ Status status;
+ std::vector<CompactionServiceOutputFile> output_files;
+ int output_level;
+
+ // location of the output files
+ std::string output_path;
+
+ // some statistics about the compaction
+ uint64_t num_output_records = 0;
+ uint64_t total_bytes = 0;
+ uint64_t bytes_read = 0;
+ uint64_t bytes_written = 0;
+ CompactionJobStats stats;
+
+ // serialization interface to read and write the object
+ static Status Read(const std::string& data_str, CompactionServiceResult* obj);
+ Status Write(std::string* output);
+
+#ifndef NDEBUG
+ bool TEST_Equals(CompactionServiceResult* other);
+ bool TEST_Equals(CompactionServiceResult* other, std::string* mismatch);
+#endif // NDEBUG
+};
+
+// CompactionServiceCompactionJob is an read-only compaction job, it takes
+// input information from `compaction_service_input` and put result information
+// in `compaction_service_result`, the SST files are generated to `output_path`.
+class CompactionServiceCompactionJob : private CompactionJob {
+ public:
+ CompactionServiceCompactionJob(
+ int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
+ const MutableDBOptions& mutable_db_options,
+ const FileOptions& file_options, VersionSet* versions,
+ const std::atomic<bool>* shutting_down, LogBuffer* log_buffer,
+ FSDirectory* output_directory, Statistics* stats,
+ InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
+ std::vector<SequenceNumber> existing_snapshots,
+ std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
+ const std::string& dbname, const std::shared_ptr<IOTracer>& io_tracer,
+ const std::atomic<bool>& manual_compaction_canceled,
+ const std::string& db_id, const std::string& db_session_id,
+ std::string output_path,
+ const CompactionServiceInput& compaction_service_input,
+ CompactionServiceResult* compaction_service_result);
+
+ // Run the compaction in current thread and return the result
+ Status Run();
+
+ void CleanupCompaction();
+
+ IOStatus io_status() const { return CompactionJob::io_status(); }
+
+ protected:
+ void RecordCompactionIOStats() override;
+
+ private:
+ // Get table file name in output_path
+ std::string GetTableFileName(uint64_t file_number) override;
+ // Specific the compaction output path, otherwise it uses default DB path
+ const std::string output_path_;
+
+ // Compaction job input
+ const CompactionServiceInput& compaction_input_;
+
+ // Compaction job result
+ CompactionServiceResult* compaction_result_;
+};
+
+} // namespace ROCKSDB_NAMESPACE