summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/blob_db/blob_compaction_filter.cc490
1 files changed, 490 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
new file mode 100644
index 000000000..86907e979
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.cc
@@ -0,0 +1,490 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include "utilities/blob_db/blob_compaction_filter.h"
+
+#include <cinttypes>
+
+#include "db/dbformat.h"
+#include "logging/logging.h"
+#include "rocksdb/system_clock.h"
+#include "test_util/sync_point.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+BlobIndexCompactionFilterBase::~BlobIndexCompactionFilterBase() {
+ if (blob_file_) {
+ CloseAndRegisterNewBlobFile();
+ }
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
+}
+
+CompactionFilter::Decision BlobIndexCompactionFilterBase::FilterV2(
+ int level, const Slice& key, ValueType value_type, const Slice& value,
+ std::string* new_value, std::string* skip_until) const {
+ const CompactionFilter* ucf = user_comp_filter();
+ if (value_type != kBlobIndex) {
+ if (ucf == nullptr) {
+ return Decision::kKeep;
+ }
+ // Apply user compaction filter for inlined data.
+ CompactionFilter::Decision decision =
+ ucf->FilterV2(level, key, value_type, value, new_value, skip_until);
+ if (decision == Decision::kChangeValue) {
+ return HandleValueChange(key, new_value);
+ }
+ return decision;
+ }
+ BlobIndex blob_index;
+ Status s = blob_index.DecodeFrom(value);
+ if (!s.ok()) {
+ // Unable to decode blob index. Keeping the value.
+ return Decision::kKeep;
+ }
+ if (blob_index.HasTTL() && blob_index.expiration() <= current_time_) {
+ // Expired
+ expired_count_++;
+ expired_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ if (!blob_index.IsInlined() &&
+ blob_index.file_number() < context_.next_file_number &&
+ context_.current_blob_files.count(blob_index.file_number()) == 0) {
+ // Corresponding blob file gone (most likely, evicted by FIFO eviction).
+ evicted_count_++;
+ evicted_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ if (context_.fifo_eviction_seq > 0 && blob_index.HasTTL() &&
+ blob_index.expiration() < context_.evict_expiration_up_to) {
+ // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
+ // get sequence number.
+ ParsedInternalKey ikey;
+ if (!ParseInternalKey(
+ key, &ikey,
+ context_.blob_db_impl->db_options_.allow_data_in_errors)
+ .ok()) {
+ assert(false);
+ return Decision::kKeep;
+ }
+ // Remove keys that could have been remove by last FIFO eviction.
+ // If get error while parsing key, ignore and continue.
+ if (ikey.sequence < context_.fifo_eviction_seq) {
+ evicted_count_++;
+ evicted_size_ += key.size() + value.size();
+ return Decision::kRemove;
+ }
+ }
+ // Apply user compaction filter for all non-TTL blob data.
+ if (ucf != nullptr && !blob_index.HasTTL()) {
+ // Hack: Internal key is passed to BlobIndexCompactionFilter for it to
+ // get sequence number.
+ ParsedInternalKey ikey;
+ if (!ParseInternalKey(
+ key, &ikey,
+ context_.blob_db_impl->db_options_.allow_data_in_errors)
+ .ok()) {
+ assert(false);
+ return Decision::kKeep;
+ }
+ // Read value from blob file.
+ PinnableSlice blob;
+ CompressionType compression_type = kNoCompression;
+ constexpr bool need_decompress = true;
+ if (!ReadBlobFromOldFile(ikey.user_key, blob_index, &blob, need_decompress,
+ &compression_type)) {
+ return Decision::kIOError;
+ }
+ CompactionFilter::Decision decision = ucf->FilterV2(
+ level, ikey.user_key, kValue, blob, new_value, skip_until);
+ if (decision == Decision::kChangeValue) {
+ return HandleValueChange(ikey.user_key, new_value);
+ }
+ return decision;
+ }
+ return Decision::kKeep;
+}
+
+CompactionFilter::Decision BlobIndexCompactionFilterBase::HandleValueChange(
+ const Slice& key, std::string* new_value) const {
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ if (new_value->size() < blob_db_impl->bdb_options_.min_blob_size) {
+ // Keep new_value inlined.
+ return Decision::kChangeValue;
+ }
+ if (!OpenNewBlobFileIfNeeded()) {
+ return Decision::kIOError;
+ }
+ Slice new_blob_value(*new_value);
+ std::string compression_output;
+ if (blob_db_impl->bdb_options_.compression != kNoCompression) {
+ new_blob_value =
+ blob_db_impl->GetCompressedSlice(new_blob_value, &compression_output);
+ }
+ uint64_t new_blob_file_number = 0;
+ uint64_t new_blob_offset = 0;
+ if (!WriteBlobToNewFile(key, new_blob_value, &new_blob_file_number,
+ &new_blob_offset)) {
+ return Decision::kIOError;
+ }
+ if (!CloseAndRegisterNewBlobFileIfNeeded()) {
+ return Decision::kIOError;
+ }
+ BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
+ new_blob_value.size(),
+ blob_db_impl->bdb_options_.compression);
+ return Decision::kChangeBlobIndex;
+}
+
+BlobIndexCompactionFilterGC::~BlobIndexCompactionFilterGC() {
+ assert(context().blob_db_impl);
+
+ ROCKS_LOG_INFO(context().blob_db_impl->db_options_.info_log,
+ "GC pass finished %s: encountered %" PRIu64 " blobs (%" PRIu64
+ " bytes), relocated %" PRIu64 " blobs (%" PRIu64
+ " bytes), created %" PRIu64 " new blob file(s)",
+ !gc_stats_.HasError() ? "successfully" : "with failure",
+ gc_stats_.AllBlobs(), gc_stats_.AllBytes(),
+ gc_stats_.RelocatedBlobs(), gc_stats_.RelocatedBytes(),
+ gc_stats_.NewFiles());
+
+ RecordTick(statistics(), BLOB_DB_GC_NUM_KEYS_RELOCATED,
+ gc_stats_.RelocatedBlobs());
+ RecordTick(statistics(), BLOB_DB_GC_BYTES_RELOCATED,
+ gc_stats_.RelocatedBytes());
+ RecordTick(statistics(), BLOB_DB_GC_NUM_NEW_FILES, gc_stats_.NewFiles());
+ RecordTick(statistics(), BLOB_DB_GC_FAILURES, gc_stats_.HasError());
+}
+
+bool BlobIndexCompactionFilterBase::IsBlobFileOpened() const {
+ if (blob_file_) {
+ assert(writer_);
+ return true;
+ }
+ return false;
+}
+
+bool BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded() const {
+ if (IsBlobFileOpened()) {
+ return true;
+ }
+
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ const Status s = blob_db_impl->CreateBlobFileAndWriter(
+ /* has_ttl */ false, ExpirationRange(), "compaction/GC", &blob_file_,
+ &writer_);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Error opening new blob file during compaction/GC, status: %s",
+ s.ToString().c_str());
+ blob_file_.reset();
+ writer_.reset();
+ return false;
+ }
+
+ assert(blob_file_);
+ assert(writer_);
+
+ return true;
+}
+
+bool BlobIndexCompactionFilterBase::ReadBlobFromOldFile(
+ const Slice& key, const BlobIndex& blob_index, PinnableSlice* blob,
+ bool need_decompress, CompressionType* compression_type) const {
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ Status s = blob_db_impl->GetRawBlobFromFile(
+ key, blob_index.file_number(), blob_index.offset(), blob_index.size(),
+ blob, compression_type);
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Error reading blob during compaction/GC, key: %s (%s), status: %s",
+ key.ToString(/* output_hex */ true).c_str(),
+ blob_index.DebugString(/* output_hex */ true).c_str(),
+ s.ToString().c_str());
+
+ return false;
+ }
+
+ if (need_decompress && *compression_type != kNoCompression) {
+ s = blob_db_impl->DecompressSlice(*blob, *compression_type, blob);
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Uncompression error during blob read from file: %" PRIu64
+ " blob_offset: %" PRIu64 " blob_size: %" PRIu64
+ " key: %s status: '%s'",
+ blob_index.file_number(), blob_index.offset(), blob_index.size(),
+ key.ToString(/* output_hex */ true).c_str(), s.ToString().c_str());
+
+ return false;
+ }
+ }
+
+ return true;
+}
+
+bool BlobIndexCompactionFilterBase::WriteBlobToNewFile(
+ const Slice& key, const Slice& blob, uint64_t* new_blob_file_number,
+ uint64_t* new_blob_offset) const {
+ TEST_SYNC_POINT("BlobIndexCompactionFilterBase::WriteBlobToNewFile");
+ assert(new_blob_file_number);
+ assert(new_blob_offset);
+
+ assert(blob_file_);
+ *new_blob_file_number = blob_file_->BlobFileNumber();
+
+ assert(writer_);
+ uint64_t new_key_offset = 0;
+ const Status s = writer_->AddRecord(key, blob, kNoExpiration, &new_key_offset,
+ new_blob_offset);
+
+ if (!s.ok()) {
+ const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ ROCKS_LOG_ERROR(blob_db_impl->db_options_.info_log,
+ "Error writing blob to new file %s during compaction/GC, "
+ "key: %s, status: %s",
+ blob_file_->PathName().c_str(),
+ key.ToString(/* output_hex */ true).c_str(),
+ s.ToString().c_str());
+ return false;
+ }
+
+ const uint64_t new_size =
+ BlobLogRecord::kHeaderSize + key.size() + blob.size();
+ blob_file_->BlobRecordAdded(new_size);
+
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ blob_db_impl->total_blob_size_ += new_size;
+
+ return true;
+}
+
+bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFileIfNeeded()
+ const {
+ const BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+
+ assert(blob_file_);
+ if (blob_file_->GetFileSize() < blob_db_impl->bdb_options_.blob_file_size) {
+ return true;
+ }
+
+ return CloseAndRegisterNewBlobFile();
+}
+
+bool BlobIndexCompactionFilterBase::CloseAndRegisterNewBlobFile() const {
+ BlobDBImpl* const blob_db_impl = context_.blob_db_impl;
+ assert(blob_db_impl);
+ assert(blob_file_);
+
+ Status s;
+
+ {
+ WriteLock wl(&blob_db_impl->mutex_);
+
+ s = blob_db_impl->CloseBlobFile(blob_file_);
+
+ // Note: we delay registering the new blob file until it's closed to
+ // prevent FIFO eviction from processing it during compaction/GC.
+ blob_db_impl->RegisterBlobFile(blob_file_);
+ }
+
+ assert(blob_file_->Immutable());
+
+ if (!s.ok()) {
+ ROCKS_LOG_ERROR(
+ blob_db_impl->db_options_.info_log,
+ "Error closing new blob file %s during compaction/GC, status: %s",
+ blob_file_->PathName().c_str(), s.ToString().c_str());
+ }
+
+ blob_file_.reset();
+ return s.ok();
+}
+
+CompactionFilter::BlobDecision BlobIndexCompactionFilterGC::PrepareBlobOutput(
+ const Slice& key, const Slice& existing_value,
+ std::string* new_value) const {
+ assert(new_value);
+
+ const BlobDBImpl* const blob_db_impl = context().blob_db_impl;
+ (void)blob_db_impl;
+
+ assert(blob_db_impl);
+ assert(blob_db_impl->bdb_options_.enable_garbage_collection);
+
+ BlobIndex blob_index;
+ const Status s = blob_index.DecodeFrom(existing_value);
+ if (!s.ok()) {
+ gc_stats_.SetError();
+ return BlobDecision::kCorruption;
+ }
+
+ if (blob_index.IsInlined()) {
+ gc_stats_.AddBlob(blob_index.value().size());
+
+ return BlobDecision::kKeep;
+ }
+
+ gc_stats_.AddBlob(blob_index.size());
+
+ if (blob_index.HasTTL()) {
+ return BlobDecision::kKeep;
+ }
+
+ if (blob_index.file_number() >= context_gc_.cutoff_file_number) {
+ return BlobDecision::kKeep;
+ }
+
+ // Note: each compaction generates its own blob files, which, depending on the
+ // workload, might result in many small blob files. The total number of files
+ // is bounded though (determined by the number of compactions and the blob
+ // file size option).
+ if (!OpenNewBlobFileIfNeeded()) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ PinnableSlice blob;
+ CompressionType compression_type = kNoCompression;
+ std::string compression_output;
+ if (!ReadBlobFromOldFile(key, blob_index, &blob, false, &compression_type)) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ // If the compression_type is changed, re-compress it with the new compression
+ // type.
+ if (compression_type != blob_db_impl->bdb_options_.compression) {
+ if (compression_type != kNoCompression) {
+ const Status status =
+ blob_db_impl->DecompressSlice(blob, compression_type, &blob);
+ if (!status.ok()) {
+ gc_stats_.SetError();
+ return BlobDecision::kCorruption;
+ }
+ }
+ if (blob_db_impl->bdb_options_.compression != kNoCompression) {
+ blob_db_impl->GetCompressedSlice(blob, &compression_output);
+ blob = PinnableSlice(&compression_output);
+ blob.PinSelf();
+ }
+ }
+
+ uint64_t new_blob_file_number = 0;
+ uint64_t new_blob_offset = 0;
+ if (!WriteBlobToNewFile(key, blob, &new_blob_file_number, &new_blob_offset)) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ if (!CloseAndRegisterNewBlobFileIfNeeded()) {
+ gc_stats_.SetError();
+ return BlobDecision::kIOError;
+ }
+
+ BlobIndex::EncodeBlob(new_value, new_blob_file_number, new_blob_offset,
+ blob.size(), compression_type);
+
+ gc_stats_.AddRelocatedBlob(blob_index.size());
+
+ return BlobDecision::kChangeValue;
+}
+
+bool BlobIndexCompactionFilterGC::OpenNewBlobFileIfNeeded() const {
+ if (IsBlobFileOpened()) {
+ return true;
+ }
+ bool result = BlobIndexCompactionFilterBase::OpenNewBlobFileIfNeeded();
+ if (result) {
+ gc_stats_.AddNewFile();
+ }
+ return result;
+}
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory(
+ const CompactionFilter::Context& context) const {
+ std::unique_ptr<CompactionFilter> user_comp_filter_from_factory;
+ if (user_comp_filter_factory_) {
+ user_comp_filter_from_factory =
+ user_comp_filter_factory_->CreateCompactionFilter(context);
+ }
+ return user_comp_filter_from_factory;
+}
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context& _context) {
+ assert(clock());
+
+ int64_t current_time = 0;
+ Status s = clock()->GetCurrentTime(&current_time);
+ if (!s.ok()) {
+ return nullptr;
+ }
+ assert(current_time >= 0);
+
+ assert(blob_db_impl());
+
+ BlobCompactionContext context;
+ blob_db_impl()->GetCompactionContext(&context);
+
+ std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
+ CreateUserCompactionFilterFromFactory(_context);
+
+ return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilter(
+ std::move(context), user_comp_filter(),
+ std::move(user_comp_filter_from_factory), current_time, statistics()));
+}
+
+std::unique_ptr<CompactionFilter>
+BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
+ const CompactionFilter::Context& _context) {
+ assert(clock());
+
+ int64_t current_time = 0;
+ Status s = clock()->GetCurrentTime(&current_time);
+ if (!s.ok()) {
+ return nullptr;
+ }
+ assert(current_time >= 0);
+
+ assert(blob_db_impl());
+
+ BlobCompactionContext context;
+ BlobCompactionContextGC context_gc;
+ blob_db_impl()->GetCompactionContext(&context, &context_gc);
+
+ std::unique_ptr<CompactionFilter> user_comp_filter_from_factory =
+ CreateUserCompactionFilterFromFactory(_context);
+
+ return std::unique_ptr<CompactionFilter>(new BlobIndexCompactionFilterGC(
+ std::move(context), std::move(context_gc), user_comp_filter(),
+ std::move(user_comp_filter_from_factory), current_time, statistics()));
+}
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE