summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/utilities/blob_db/blob_compaction_filter.h')
-rw-r--r--src/rocksdb/utilities/blob_db/blob_compaction_filter.h168
1 files changed, 168 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_compaction_filter.h b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
new file mode 100644
index 000000000..409df26ac
--- /dev/null
+++ b/src/rocksdb/utilities/blob_db/blob_compaction_filter.h
@@ -0,0 +1,168 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+#pragma once
+#ifndef ROCKSDB_LITE
+
+#include <unordered_set>
+
+#include "db/blob_index.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/env.h"
+#include "utilities/blob_db/blob_db_gc_stats.h"
+#include "utilities/blob_db/blob_db_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+namespace blob_db {
+
+struct BlobCompactionContext {
+ uint64_t next_file_number = 0;
+ std::unordered_set<uint64_t> current_blob_files;
+ SequenceNumber fifo_eviction_seq = 0;
+ uint64_t evict_expiration_up_to = 0;
+};
+
+struct BlobCompactionContextGC {
+ BlobDBImpl* blob_db_impl = nullptr;
+ uint64_t cutoff_file_number = 0;
+};
+
+// Compaction filter that deletes expired blob indexes from the base DB.
+// Comes into two varieties, one for the non-GC case and one for the GC case.
+class BlobIndexCompactionFilterBase : public CompactionFilter {
+ public:
+ BlobIndexCompactionFilterBase(BlobCompactionContext&& context,
+ uint64_t current_time, Statistics* stats)
+ : context_(std::move(context)),
+ current_time_(current_time),
+ statistics_(stats) {}
+
+ ~BlobIndexCompactionFilterBase() override {
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_COUNT, expired_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EXPIRED_SIZE, expired_size_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_COUNT, evicted_count_);
+ RecordTick(statistics_, BLOB_DB_BLOB_INDEX_EVICTED_SIZE, evicted_size_);
+ }
+
+ // Filter expired blob indexes regardless of snapshots.
+ bool IgnoreSnapshots() const override { return true; }
+
+ Decision FilterV2(int /*level*/, const Slice& key, ValueType value_type,
+ const Slice& value, std::string* /*new_value*/,
+ std::string* /*skip_until*/) const override;
+
+ protected:
+ Statistics* statistics() const { return statistics_; }
+
+ private:
+ BlobCompactionContext context_;
+ const uint64_t current_time_;
+ Statistics* statistics_;
+ // It is safe to not using std::atomic since the compaction filter, created
+ // from a compaction filter factroy, will not be called from multiple threads.
+ mutable uint64_t expired_count_ = 0;
+ mutable uint64_t expired_size_ = 0;
+ mutable uint64_t evicted_count_ = 0;
+ mutable uint64_t evicted_size_ = 0;
+};
+
+class BlobIndexCompactionFilter : public BlobIndexCompactionFilterBase {
+ public:
+ BlobIndexCompactionFilter(BlobCompactionContext&& context,
+ uint64_t current_time, Statistics* stats)
+ : BlobIndexCompactionFilterBase(std::move(context), current_time, stats) {
+ }
+
+ const char* Name() const override { return "BlobIndexCompactionFilter"; }
+};
+
+class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
+ public:
+ BlobIndexCompactionFilterGC(BlobCompactionContext&& context,
+ BlobCompactionContextGC&& context_gc,
+ uint64_t current_time, Statistics* stats)
+ : BlobIndexCompactionFilterBase(std::move(context), current_time, stats),
+ context_gc_(std::move(context_gc)) {}
+
+ ~BlobIndexCompactionFilterGC() override;
+
+ const char* Name() const override { return "BlobIndexCompactionFilterGC"; }
+
+ BlobDecision PrepareBlobOutput(const Slice& key, const Slice& existing_value,
+ std::string* new_value) const override;
+
+ private:
+ bool OpenNewBlobFileIfNeeded() const;
+ bool ReadBlobFromOldFile(const Slice& key, const BlobIndex& blob_index,
+ PinnableSlice* blob,
+ CompressionType* compression_type) const;
+ bool WriteBlobToNewFile(const Slice& key, const Slice& blob,
+ uint64_t* new_blob_file_number,
+ uint64_t* new_blob_offset) const;
+ bool CloseAndRegisterNewBlobFileIfNeeded() const;
+ bool CloseAndRegisterNewBlobFile() const;
+
+ private:
+ BlobCompactionContextGC context_gc_;
+ mutable std::shared_ptr<BlobFile> blob_file_;
+ mutable std::shared_ptr<Writer> writer_;
+ mutable BlobDBGarbageCollectionStats gc_stats_;
+};
+
+// Compaction filter factory; similarly to the filters above, it comes
+// in two flavors, one that creates filters that support GC, and one
+// that creates non-GC filters.
+class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
+ public:
+ BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
+ Statistics* _statistics)
+ : blob_db_impl_(_blob_db_impl), env_(_env), statistics_(_statistics) {}
+
+ protected:
+ BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
+ Env* env() const { return env_; }
+ Statistics* statistics() const { return statistics_; }
+
+ private:
+ BlobDBImpl* blob_db_impl_;
+ Env* env_;
+ Statistics* statistics_;
+};
+
+class BlobIndexCompactionFilterFactory
+ : public BlobIndexCompactionFilterFactoryBase {
+ public:
+ BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
+ Statistics* _statistics)
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
+ }
+
+ const char* Name() const override {
+ return "BlobIndexCompactionFilterFactory";
+ }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override;
+};
+
+class BlobIndexCompactionFilterFactoryGC
+ : public BlobIndexCompactionFilterFactoryBase {
+ public:
+ BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
+ Statistics* _statistics)
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _statistics) {
+ }
+
+ const char* Name() const override {
+ return "BlobIndexCompactionFilterFactoryGC";
+ }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override;
+};
+
+} // namespace blob_db
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE