diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/db/external_sst_file_ingestion_job.h | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/db/external_sst_file_ingestion_job.h')
-rw-r--r-- | src/rocksdb/db/external_sst_file_ingestion_job.h | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/src/rocksdb/db/external_sst_file_ingestion_job.h b/src/rocksdb/db/external_sst_file_ingestion_job.h new file mode 100644 index 000000000..ce50ae86d --- /dev/null +++ b/src/rocksdb/db/external_sst_file_ingestion_job.h @@ -0,0 +1,201 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once +#include <string> +#include <unordered_set> +#include <vector> + +#include "db/column_family.h" +#include "db/internal_stats.h" +#include "db/snapshot_impl.h" +#include "env/file_system_tracer.h" +#include "logging/event_logger.h" +#include "options/db_options.h" +#include "rocksdb/db.h" +#include "rocksdb/file_system.h" +#include "rocksdb/sst_file_writer.h" +#include "util/autovector.h" + +namespace ROCKSDB_NAMESPACE { + +class Directories; +class SystemClock; + +struct IngestedFileInfo { + // External file path + std::string external_file_path; + // Smallest internal key in external file + InternalKey smallest_internal_key; + // Largest internal key in external file + InternalKey largest_internal_key; + // Sequence number for keys in external file + SequenceNumber original_seqno; + // Offset of the global sequence number field in the file, will + // be zero if version is 1 (global seqno is not supported) + size_t global_seqno_offset; + // External file size + uint64_t file_size; + // total number of keys in external file + uint64_t num_entries; + // total number of range deletions in external file + uint64_t num_range_deletions; + // Id of column family this file shoule be ingested into + uint32_t cf_id; + // TableProperties read from external file + TableProperties table_properties; + // Version of external file + int version; + + // FileDescriptor for the file inside the DB + FileDescriptor fd; + // file path that we picked for file inside the DB + std::string internal_file_path; + // Global sequence number that we picked for the file inside the DB + SequenceNumber assigned_seqno = 0; + // Level inside the DB we picked for the external file. + int picked_level = 0; + // Whether to copy or link the external sst file. copy_file will be set to + // false if ingestion_options.move_files is true and underlying FS + // supports link operation. Need to provide a default value to make the + // undefined-behavior sanity check of llvm happy. Since + // ingestion_options.move_files is false by default, thus copy_file is true + // by default. + bool copy_file = true; + // The checksum of ingested file + std::string file_checksum; + // The name of checksum function that generate the checksum + std::string file_checksum_func_name; + // The temperature of the file to be ingested + Temperature file_temperature = Temperature::kUnknown; + // Unique id of the file to be ingested + UniqueId64x2 unique_id{}; +}; + +class ExternalSstFileIngestionJob { + public: + ExternalSstFileIngestionJob( + VersionSet* versions, ColumnFamilyData* cfd, + const ImmutableDBOptions& db_options, const EnvOptions& env_options, + SnapshotList* db_snapshots, + const IngestExternalFileOptions& ingestion_options, + Directories* directories, EventLogger* event_logger, + const std::shared_ptr<IOTracer>& io_tracer) + : clock_(db_options.clock), + fs_(db_options.fs, io_tracer), + versions_(versions), + cfd_(cfd), + db_options_(db_options), + env_options_(env_options), + db_snapshots_(db_snapshots), + ingestion_options_(ingestion_options), + directories_(directories), + event_logger_(event_logger), + job_start_time_(clock_->NowMicros()), + consumed_seqno_count_(0), + io_tracer_(io_tracer) { + assert(directories != nullptr); + } + + // Prepare the job by copying external files into the DB. + Status Prepare(const std::vector<std::string>& external_files_paths, + const std::vector<std::string>& files_checksums, + const std::vector<std::string>& files_checksum_func_names, + const Temperature& file_temperature, uint64_t next_file_number, + SuperVersion* sv); + + // Check if we need to flush the memtable before running the ingestion job + // This will be true if the files we are ingesting are overlapping with any + // key range in the memtable. + // + // @param super_version A referenced SuperVersion that will be held for the + // duration of this function. + // + // Thread-safe + Status NeedsFlush(bool* flush_needed, SuperVersion* super_version); + + // Will execute the ingestion job and prepare edit() to be applied. + // REQUIRES: Mutex held + Status Run(); + + // Update column family stats. + // REQUIRES: Mutex held + void UpdateStats(); + + // Cleanup after successful/failed job + void Cleanup(const Status& status); + + VersionEdit* edit() { return &edit_; } + + const autovector<IngestedFileInfo>& files_to_ingest() const { + return files_to_ingest_; + } + + // How many sequence numbers did we consume as part of the ingest job? + int ConsumedSequenceNumbersCount() const { return consumed_seqno_count_; } + + private: + // Open the external file and populate `file_to_ingest` with all the + // external information we need to ingest this file. + Status GetIngestedFileInfo(const std::string& external_file, + uint64_t new_file_number, + IngestedFileInfo* file_to_ingest, + SuperVersion* sv); + + // Assign `file_to_ingest` the appropriate sequence number and the lowest + // possible level that it can be ingested to according to compaction_style. + // REQUIRES: Mutex held + Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv, + bool force_global_seqno, + CompactionStyle compaction_style, + SequenceNumber last_seqno, + IngestedFileInfo* file_to_ingest, + SequenceNumber* assigned_seqno); + + // File that we want to ingest behind always goes to the lowest level; + // we just check that it fits in the level, that DB allows ingest_behind, + // and that we don't have 0 seqnums at the upper levels. + // REQUIRES: Mutex held + Status CheckLevelForIngestedBehindFile(IngestedFileInfo* file_to_ingest); + + // Set the file global sequence number to `seqno` + Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest, + SequenceNumber seqno); + // Generate the file checksum and store in the IngestedFileInfo + IOStatus GenerateChecksumForIngestedFile(IngestedFileInfo* file_to_ingest); + + // Check if `file_to_ingest` can fit in level `level` + // REQUIRES: Mutex held + bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest, + int level); + + // Helper method to sync given file. + template <typename TWritableFile> + Status SyncIngestedFile(TWritableFile* file); + + SystemClock* clock_; + FileSystemPtr fs_; + VersionSet* versions_; + ColumnFamilyData* cfd_; + const ImmutableDBOptions& db_options_; + const EnvOptions& env_options_; + SnapshotList* db_snapshots_; + autovector<IngestedFileInfo> files_to_ingest_; + const IngestExternalFileOptions& ingestion_options_; + Directories* directories_; + EventLogger* event_logger_; + VersionEdit edit_; + uint64_t job_start_time_; + int consumed_seqno_count_; + // Set in ExternalSstFileIngestionJob::Prepare(), if true all files are + // ingested in L0 + bool files_overlap_{false}; + // Set in ExternalSstFileIngestionJob::Prepare(), if true and DB + // file_checksum_gen_factory is set, DB will generate checksum each file. + bool need_generate_file_checksum_{true}; + std::shared_ptr<IOTracer> io_tracer_; +}; + +} // namespace ROCKSDB_NAMESPACE |