diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/blob_db/blob_compaction_filter.cc | |
parent | Initial commit. (diff) | |
download | ceph-b26c4052f3542036551aa9dec9caa4226e456195.tar.xz ceph-b26c4052f3542036551aa9dec9caa4226e456195.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to '')
-rw-r--r-- | src/rocksdb/utilities/blob_db/blob_compaction_filter.cc | 490 |
1 files changed, 490 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc new file mode 100644 index 000000000..86907e979 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc @@ -0,0 +1,490 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "utilities/blob_db/blob_compaction_filter.h" + +#include <cinttypes> + +#include "db/dbformat.h" +#include "logging/logging.h" +#include "rocksdb/system_clock.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { +namespace blob_db { + +BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() { + if (blob_file_) { + CloseAndRegisterNewBlobFile(); + } + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_); + RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_); +} + +CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2( + int level, const Slice& key, ValueType value_type, const Slice& value, + std::string* new_value, std::string* skip_until) const { + const CompactionFilter* ucf = user_comp_filter(); + if (value_type != kBlobIndex) { + if (ucf == nullptr) { + return Decision::kKeep; + } + // Apply user compaction filter for inlined data. + CompactionFilter::Decision decision = + ucf->FilterV2(level, key, value_type, value, new_value, skip_until); + if (decision == Decision::kChangeValue) { + return HandleValueChange(key, new_value); + } + return decision; + } + BlobIndex blob_index; + Status s = blob_index.DecodeFrom(value); + if (!s.ok()) { + // Unable to decode blob index. Keeping the value. + return Decision::kKeep; + } + if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) { + // Expired + expired_count_++; + expired_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (!blob_index.IsInlined() && + blob_index.file_number() < context_.next_file_number && + context_.current_blob_files.count(blob_index.file_number()) == 0) { + // Corresponding blob file gone (most likely, evicted by FIFO eviction). + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() && + blob_index.expiration() < context_.evict_expiration_up_to) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + if (!ParseInternalKey( + key, &ikey, + context_.blob_db_impl->db_options_.allow_data_in_errors) + .ok()) { + assert(false); + return Decision::kKeep; + } + // Remove keys that could have been remove by last FIFO eviction. + // If get error while parsing key, ignore and continue. + if (ikey.sequence < context_.fifo_eviction_seq) { + evicted_count_++; + evicted_size_ += key.size() + value.size(); + return Decision::kRemove; + } + } + // Apply user compaction filter for all non-TTL blob data. + if (ucf != nullptr && !blob_index.HasTTL()) { + // Hack: Internal key is passed to BlobIndexCompactionFilter for it to + // get sequence number. + ParsedInternalKey ikey; + if (!ParseInternalKey( + key, &ikey, + context_.blob_db_impl->db_options_.allow_data_in_errors) + .ok()) { + assert(false); + return Decision::kKeep; + } + // Read value from blob file. + PinnableSlice blob; + CompressionType compression_type = kNoCompression; + constexpr bool need_decompress = true; + if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress, + &compression_type)) { + return Decision::kIOError; + } + CompactionFilter::Decision decision = ucf->FilterV2( + level, ikey.user_key, kValue, blob, new_value, skip_until); + if (decision == Decision::kChangeValue) { + return HandleValueChange(ikey.user_key, new_value); + } + return decision; + } + return Decision::kKeep; +} + +CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange( + const Slice& key, std::string* new_value) const { + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) { + // Keep new_value inlined. + return Decision::kChangeValue; + } + if (!OpenNewBlobFileIfNeeded()) { + return Decision::kIOError; + } + Slice new_blob_value(*new_value); + std::string compression_output; + if (blob_db_impl->bdb_options_.compression != kNoCompression) { + new_blob_value = + blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output); + } + uint64_t new_blob_file_number = 0; + uint64_t new_blob_offset = 0; + if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number, + &new_blob_offset)) { + return Decision::kIOError; + } + if (!CloseAndRegisterNewBlobFileIfNeeded()) { + return Decision::kIOError; + } + BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, + new_blob_value.size(), + blob_db_impl->bdb_options_.compression); + return Decision::kChangeBlobIndex; +} + +BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() { + assert(context().blob_db_impl); + + ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log, + "GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64 + " bytes), relocated %" PRIu64 " blobs (%" PRIu64 + " bytes), created %" PRIu64 " new blob file(s)", + !gc_stats_.HasError() ? "successfully" : "with failure", + gc_stats_.AllBlobs(), gc_stats_.AllBytes(), + gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(), + gc_stats_.NewFiles()); + + RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED, + gc_stats_.RelocatedBlobs()); + RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED, + gc_stats_.RelocatedBytes()); + RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles()); + RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError()); +} + +bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const { + if (blob_file_) { + assert(writer_); + return true; + } + return false; +} + +bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const { + if (IsBlobFileOpened()) { + return true; + } + + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + const Status s = blob_db_impl->CreateBlobFileAndWriter( + /* has_ttl */ false, ExpirationRange(), "compaction/GC", &blob_file_, + &writer_); + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error opening new blob file during compaction/GC, status: %s", + s.ToString().c_str()); + blob_file_.reset(); + writer_.reset(); + return false; + } + + assert(blob_file_); + assert(writer_); + + return true; +} + +bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile( + const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob, + bool need_decompress, CompressionType* compression_type) const { + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + Status s = blob_db_impl->GetRawBlobFromFile( + key, blob_index.file_number(), blob_index.offset(), blob_index.size(), + blob, compression_type); + + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error reading blob during compaction/GC, key: %s (%s), status: %s", + key.ToString(/* output_hex */ true).c_str(), + blob_index.DebugString(/* output_hex */ true).c_str(), + s.ToString().c_str()); + + return false; + } + + if (need_decompress && *compression_type != kNoCompression) { + s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob); + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Uncompression error during blob read from file: %" PRIu64 + " blob_offset: %" PRIu64 " blob_size: %" PRIu64 + " key: %s status: '%s'", + blob_index.file_number(), blob_index.offset(), blob_index.size(), + key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str()); + + return false; + } + } + + return true; +} + +bool BlobIndexCompactionFilterBase::WriteBlobToNewFile( + const Slice& key, const Slice& blob, uint64_t* new_blob_file_number, + uint64_t* new_blob_offset) const { + TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile"); + assert(new_blob_file_number); + assert(new_blob_offset); + + assert(blob_file_); + *new_blob_file_number = blob_file_->BlobFileNumber(); + + assert(writer_); + uint64_t new_key_offset = 0; + const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset, + new_blob_offset); + + if (!s.ok()) { + const BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log, + "Error writing blob to new file %s during compaction/GC, " + "key: %s, status: %s", + blob_file_->PathName().c_str(), + key.ToString(/* output_hex */ true).c_str(), + s.ToString().c_str()); + return false; + } + + const uint64_t new_size = + BlobLogRecord::kHeaderSize + key.size() + blob.size(); + blob_file_->BlobRecordAdded(new_size); + + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + blob_db_impl->total_blob_size_ += new_size; + + return true; +} + +bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded() + const { + const BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + + assert(blob_file_); + if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) { + return true; + } + + return CloseAndRegisterNewBlobFile(); +} + +bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const { + BlobDBImpl* const blob_db_impl = context_.blob_db_impl; + assert(blob_db_impl); + assert(blob_file_); + + Status s; + + { + WriteLock wl(&blob_db_impl->mutex_); + + s = blob_db_impl->CloseBlobFile(blob_file_); + + // Note: we delay registering the new blob file until it's closed to + // prevent FIFO eviction from processing it during compaction/GC. + blob_db_impl->RegisterBlobFile(blob_file_); + } + + assert(blob_file_->Immutable()); + + if (!s.ok()) { + ROCKS_LOG_ERROR( + blob_db_impl->db_options_.info_log, + "Error closing new blob file %s during compaction/GC, status: %s", + blob_file_->PathName().c_str(), s.ToString().c_str()); + } + + blob_file_.reset(); + return s.ok(); +} + +CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput( + const Slice& key, const Slice& existing_value, + std::string* new_value) const { + assert(new_value); + + const BlobDBImpl* const blob_db_impl = context().blob_db_impl; + (void)blob_db_impl; + + assert(blob_db_impl); + assert(blob_db_impl->bdb_options_.enable_garbage_collection); + + BlobIndex blob_index; + const Status s = blob_index.DecodeFrom(existing_value); + if (!s.ok()) { + gc_stats_.SetError(); + return BlobDecision::kCorruption; + } + + if (blob_index.IsInlined()) { + gc_stats_.AddBlob(blob_index.value().size()); + + return BlobDecision::kKeep; + } + + gc_stats_.AddBlob(blob_index.size()); + + if (blob_index.HasTTL()) { + return BlobDecision::kKeep; + } + + if (blob_index.file_number() >= context_gc_.cutoff_file_number) { + return BlobDecision::kKeep; + } + + // Note: each compaction generates its own blob files, which, depending on the + // workload, might result in many small blob files. The total number of files + // is bounded though (determined by the number of compactions and the blob + // file size option). + if (!OpenNewBlobFileIfNeeded()) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + PinnableSlice blob; + CompressionType compression_type = kNoCompression; + std::string compression_output; + if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + // If the compression_type is changed, re-compress it with the new compression + // type. + if (compression_type != blob_db_impl->bdb_options_.compression) { + if (compression_type != kNoCompression) { + const Status status = + blob_db_impl->DecompressSlice(blob, compression_type, &blob); + if (!status.ok()) { + gc_stats_.SetError(); + return BlobDecision::kCorruption; + } + } + if (blob_db_impl->bdb_options_.compression != kNoCompression) { + blob_db_impl->GetCompressedSlice(blob, &compression_output); + blob = PinnableSlice(&compression_output); + blob.PinSelf(); + } + } + + uint64_t new_blob_file_number = 0; + uint64_t new_blob_offset = 0; + if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + if (!CloseAndRegisterNewBlobFileIfNeeded()) { + gc_stats_.SetError(); + return BlobDecision::kIOError; + } + + BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset, + blob.size(), compression_type); + + gc_stats_.AddRelocatedBlob(blob_index.size()); + + return BlobDecision::kChangeValue; +} + +bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const { + if (IsBlobFileOpened()) { + return true; + } + bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded(); + if (result) { + gc_stats_.AddNewFile(); + } + return result; +} + +std::unique_ptr<CompactionFilter> +BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory( + const CompactionFilter::Context& context) const { + std::unique_ptr<CompactionFilter> user_comp_filter_from_factory; + if (user_comp_filter_factory_) { + user_comp_filter_from_factory = + user_comp_filter_factory_->CreateCompactionFilter(context); + } + return user_comp_filter_from_factory; +} + +std::unique_ptr<CompactionFilter> +BlobIndexCompactionFilterFactory::CreateCompactionFilter( + const CompactionFilter::Context& _context) { + assert(clock()); + + int64_t current_time = 0; + Status s = clock()->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + assert(blob_db_impl()); + + BlobCompactionContext context; + blob_db_impl()->GetCompactionContext(&context); + + std::unique_ptr<CompactionFilter> user_comp_filter_from_factory = + CreateUserCompactionFilterFromFactory(_context); + + return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter( + std::move(context), user_comp_filter(), + std::move(user_comp_filter_from_factory), current_time, statistics())); +} + +std::unique_ptr<CompactionFilter> +BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter( + const CompactionFilter::Context& _context) { + assert(clock()); + + int64_t current_time = 0; + Status s = clock()->GetCurrentTime(¤t_time); + if (!s.ok()) { + return nullptr; + } + assert(current_time >= 0); + + assert(blob_db_impl()); + + BlobCompactionContext context; + BlobCompactionContextGC context_gc; + blob_db_impl()->GetCompactionContext(&context, &context_gc); + + std::unique_ptr<CompactionFilter> user_comp_filter_from_factory = + CreateUserCompactionFilterFromFactory(_context); + + return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC( + std::move(context), std::move(context_gc), user_comp_filter(), + std::move(user_comp_filter_from_factory), current_time, statistics())); +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE |