summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/ttl
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/utilities/ttl/db_ttl_impl.cc609
-rw-r--r--src/rocksdb/utilities/ttl/db_ttl_impl.h245
-rw-r--r--src/rocksdb/utilities/ttl/ttl_test.cc912
3 files changed, 1766 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/ttl/db_ttl_impl.cc b/src/rocksdb/utilities/ttl/db_ttl_impl.cc
new file mode 100644
index 000000000..6ec9d87b0
--- /dev/null
+++ b/src/rocksdb/utilities/ttl/db_ttl_impl.cc
@@ -0,0 +1,609 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#ifndef ROCKSDB_LITE
+
+#include "utilities/ttl/db_ttl_impl.h"
+
+#include "db/write_batch_internal.h"
+#include "file/filename.h"
+#include "logging/logging.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/env.h"
+#include "rocksdb/iterator.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "rocksdb/utilities/options_type.h"
+#include "util/coding.h"
+
+namespace ROCKSDB_NAMESPACE {
+static std::unordered_map<std::string, OptionTypeInfo> ttl_merge_op_type_info =
+ {{"user_operator",
+ OptionTypeInfo::AsCustomSharedPtr<MergeOperator>(
+ 0, OptionVerificationType::kByName, OptionTypeFlags::kNone)}};
+
+TtlMergeOperator::TtlMergeOperator(
+ const std::shared_ptr<MergeOperator>& merge_op, SystemClock* clock)
+ : user_merge_op_(merge_op), clock_(clock) {
+ RegisterOptions("TtlMergeOptions", &user_merge_op_, &ttl_merge_op_type_info);
+}
+
+bool TtlMergeOperator::FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const {
+ const uint32_t ts_len = DBWithTTLImpl::kTSLength;
+ if (merge_in.existing_value && merge_in.existing_value->size() < ts_len) {
+ ROCKS_LOG_ERROR(merge_in.logger,
+ "Error: Could not remove timestamp from existing value.");
+ return false;
+ }
+
+ // Extract time-stamp from each operand to be passed to user_merge_op_
+ std::vector<Slice> operands_without_ts;
+ for (const auto& operand : merge_in.operand_list) {
+ if (operand.size() < ts_len) {
+ ROCKS_LOG_ERROR(merge_in.logger,
+ "Error: Could not remove timestamp from operand value.");
+ return false;
+ }
+ operands_without_ts.push_back(operand);
+ operands_without_ts.back().remove_suffix(ts_len);
+ }
+
+ // Apply the user merge operator (store result in *new_value)
+ bool good = true;
+ MergeOperationOutput user_merge_out(merge_out->new_value,
+ merge_out->existing_operand);
+ if (merge_in.existing_value) {
+ Slice existing_value_without_ts(merge_in.existing_value->data(),
+ merge_in.existing_value->size() - ts_len);
+ good = user_merge_op_->FullMergeV2(
+ MergeOperationInput(merge_in.key, &existing_value_without_ts,
+ operands_without_ts, merge_in.logger),
+ &user_merge_out);
+ } else {
+ good = user_merge_op_->FullMergeV2(
+ MergeOperationInput(merge_in.key, nullptr, operands_without_ts,
+ merge_in.logger),
+ &user_merge_out);
+ }
+
+ // Return false if the user merge operator returned false
+ if (!good) {
+ return false;
+ }
+
+ if (merge_out->existing_operand.data()) {
+ merge_out->new_value.assign(merge_out->existing_operand.data(),
+ merge_out->existing_operand.size());
+ merge_out->existing_operand = Slice(nullptr, 0);
+ }
+
+ // Augment the *new_value with the ttl time-stamp
+ int64_t curtime;
+ if (!clock_->GetCurrentTime(&curtime).ok()) {
+ ROCKS_LOG_ERROR(
+ merge_in.logger,
+ "Error: Could not get current time to be attached internally "
+ "to the new value.");
+ return false;
+ } else {
+ char ts_string[ts_len];
+ EncodeFixed32(ts_string, (int32_t)curtime);
+ merge_out->new_value.append(ts_string, ts_len);
+ return true;
+ }
+}
+
+bool TtlMergeOperator::PartialMergeMulti(const Slice& key,
+ const std::deque<Slice>& operand_list,
+ std::string* new_value,
+ Logger* logger) const {
+ const uint32_t ts_len = DBWithTTLImpl::kTSLength;
+ std::deque<Slice> operands_without_ts;
+
+ for (const auto& operand : operand_list) {
+ if (operand.size() < ts_len) {
+ ROCKS_LOG_ERROR(logger, "Error: Could not remove timestamp from value.");
+ return false;
+ }
+
+ operands_without_ts.push_back(
+ Slice(operand.data(), operand.size() - ts_len));
+ }
+
+ // Apply the user partial-merge operator (store result in *new_value)
+ assert(new_value);
+ if (!user_merge_op_->PartialMergeMulti(key, operands_without_ts, new_value,
+ logger)) {
+ return false;
+ }
+
+ // Augment the *new_value with the ttl time-stamp
+ int64_t curtime;
+ if (!clock_->GetCurrentTime(&curtime).ok()) {
+ ROCKS_LOG_ERROR(
+ logger,
+ "Error: Could not get current time to be attached internally "
+ "to the new value.");
+ return false;
+ } else {
+ char ts_string[ts_len];
+ EncodeFixed32(ts_string, (int32_t)curtime);
+ new_value->append(ts_string, ts_len);
+ return true;
+ }
+}
+
+Status TtlMergeOperator::PrepareOptions(const ConfigOptions& config_options) {
+ if (clock_ == nullptr) {
+ clock_ = config_options.env->GetSystemClock().get();
+ }
+ return MergeOperator::PrepareOptions(config_options);
+}
+
+Status TtlMergeOperator::ValidateOptions(
+ const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+ if (user_merge_op_ == nullptr) {
+ return Status::InvalidArgument(
+ "UserMergeOperator required by TtlMergeOperator");
+ } else if (clock_ == nullptr) {
+ return Status::InvalidArgument("SystemClock required by TtlMergeOperator");
+ } else {
+ return MergeOperator::ValidateOptions(db_opts, cf_opts);
+ }
+}
+
+void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
+ SystemClock* clock) {
+ if (options->compaction_filter) {
+ options->compaction_filter =
+ new TtlCompactionFilter(ttl, clock, options->compaction_filter);
+ } else {
+ options->compaction_filter_factory =
+ std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
+ ttl, clock, options->compaction_filter_factory));
+ }
+
+ if (options->merge_operator) {
+ options->merge_operator.reset(
+ new TtlMergeOperator(options->merge_operator, clock));
+ }
+}
+
+static std::unordered_map<std::string, OptionTypeInfo> ttl_type_info = {
+ {"ttl", {0, OptionType::kInt32T}},
+};
+
+static std::unordered_map<std::string, OptionTypeInfo> ttl_cff_type_info = {
+ {"user_filter_factory",
+ OptionTypeInfo::AsCustomSharedPtr<CompactionFilterFactory>(
+ 0, OptionVerificationType::kByNameAllowFromNull,
+ OptionTypeFlags::kNone)}};
+static std::unordered_map<std::string, OptionTypeInfo> user_cf_type_info = {
+ {"user_filter",
+ OptionTypeInfo::AsCustomRawPtr<const CompactionFilter>(
+ 0, OptionVerificationType::kByName, OptionTypeFlags::kAllowNull)}};
+
+TtlCompactionFilter::TtlCompactionFilter(
+ int32_t ttl, SystemClock* clock, const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter> _user_comp_filter_from_factory)
+ : LayeredCompactionFilterBase(_user_comp_filter,
+ std::move(_user_comp_filter_from_factory)),
+ ttl_(ttl),
+ clock_(clock) {
+ RegisterOptions("TTL", &ttl_, &ttl_type_info);
+ RegisterOptions("UserFilter", &user_comp_filter_, &user_cf_type_info);
+}
+
+bool TtlCompactionFilter::Filter(int level, const Slice& key,
+ const Slice& old_val, std::string* new_val,
+ bool* value_changed) const {
+ if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
+ return true;
+ }
+ if (user_comp_filter() == nullptr) {
+ return false;
+ }
+ assert(old_val.size() >= DBWithTTLImpl::kTSLength);
+ Slice old_val_without_ts(old_val.data(),
+ old_val.size() - DBWithTTLImpl::kTSLength);
+ if (user_comp_filter()->Filter(level, key, old_val_without_ts, new_val,
+ value_changed)) {
+ return true;
+ }
+ if (*value_changed) {
+ new_val->append(old_val.data() + old_val.size() - DBWithTTLImpl::kTSLength,
+ DBWithTTLImpl::kTSLength);
+ }
+ return false;
+}
+
+Status TtlCompactionFilter::PrepareOptions(
+ const ConfigOptions& config_options) {
+ if (clock_ == nullptr) {
+ clock_ = config_options.env->GetSystemClock().get();
+ }
+ return LayeredCompactionFilterBase::PrepareOptions(config_options);
+}
+
+Status TtlCompactionFilter::ValidateOptions(
+ const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+ if (clock_ == nullptr) {
+ return Status::InvalidArgument(
+ "SystemClock required by TtlCompactionFilter");
+ } else {
+ return LayeredCompactionFilterBase::ValidateOptions(db_opts, cf_opts);
+ }
+}
+
+TtlCompactionFilterFactory::TtlCompactionFilterFactory(
+ int32_t ttl, SystemClock* clock,
+ std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
+ : ttl_(ttl), clock_(clock), user_comp_filter_factory_(comp_filter_factory) {
+ RegisterOptions("UserOptions", &user_comp_filter_factory_,
+ &ttl_cff_type_info);
+ RegisterOptions("TTL", &ttl_, &ttl_type_info);
+}
+
+std::unique_ptr<CompactionFilter>
+TtlCompactionFilterFactory::CreateCompactionFilter(
+ const CompactionFilter::Context& context) {
+ std::unique_ptr<const CompactionFilter> user_comp_filter_from_factory =
+ nullptr;
+ if (user_comp_filter_factory_) {
+ user_comp_filter_from_factory =
+ user_comp_filter_factory_->CreateCompactionFilter(context);
+ }
+
+ return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
+ ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
+}
+
+Status TtlCompactionFilterFactory::PrepareOptions(
+ const ConfigOptions& config_options) {
+ if (clock_ == nullptr) {
+ clock_ = config_options.env->GetSystemClock().get();
+ }
+ return CompactionFilterFactory::PrepareOptions(config_options);
+}
+
+Status TtlCompactionFilterFactory::ValidateOptions(
+ const DBOptions& db_opts, const ColumnFamilyOptions& cf_opts) const {
+ if (clock_ == nullptr) {
+ return Status::InvalidArgument(
+ "SystemClock required by TtlCompactionFilterFactory");
+ } else {
+ return CompactionFilterFactory::ValidateOptions(db_opts, cf_opts);
+ }
+}
+
+int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/) {
+ library.AddFactory<MergeOperator>(
+ TtlMergeOperator::kClassName(),
+ [](const std::string& /*uri*/, std::unique_ptr<MergeOperator>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new TtlMergeOperator(nullptr, nullptr));
+ return guard->get();
+ });
+ library.AddFactory<CompactionFilterFactory>(
+ TtlCompactionFilterFactory::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new TtlCompactionFilterFactory(0, nullptr, nullptr));
+ return guard->get();
+ });
+ library.AddFactory<CompactionFilter>(
+ TtlCompactionFilter::kClassName(),
+ [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /* errmsg */) {
+ return new TtlCompactionFilter(0, nullptr, nullptr);
+ });
+ size_t num_types;
+ return static_cast<int>(library.GetFactoryCount(&num_types));
+}
+// Open the db inside DBWithTTLImpl because options needs pointer to its ttl
+DBWithTTLImpl::DBWithTTLImpl(DB* db) : DBWithTTL(db), closed_(false) {}
+
+DBWithTTLImpl::~DBWithTTLImpl() {
+ if (!closed_) {
+ Close().PermitUncheckedError();
+ }
+}
+
+Status DBWithTTLImpl::Close() {
+ Status ret = Status::OK();
+ if (!closed_) {
+ Options default_options = GetOptions();
+ // Need to stop background compaction before getting rid of the filter
+ CancelAllBackgroundWork(db_, /* wait = */ true);
+ ret = db_->Close();
+ delete default_options.compaction_filter;
+ closed_ = true;
+ }
+ return ret;
+}
+
+void DBWithTTLImpl::RegisterTtlClasses() {
+ static std::once_flag once;
+ std::call_once(once, [&]() {
+ ObjectRegistry::Default()->AddLibrary("TTL", RegisterTtlObjects, "");
+ });
+}
+
+Status DBWithTTL::Open(const Options& options, const std::string& dbname,
+ DBWithTTL** dbptr, int32_t ttl, bool read_only) {
+ DBOptions db_options(options);
+ ColumnFamilyOptions cf_options(options);
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(
+ ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
+ std::vector<ColumnFamilyHandle*> handles;
+ Status s = DBWithTTL::Open(db_options, dbname, column_families, &handles,
+ dbptr, {ttl}, read_only);
+ if (s.ok()) {
+ assert(handles.size() == 1);
+ // i can delete the handle since DBImpl is always holding a reference to
+ // default column family
+ delete handles[0];
+ }
+ return s;
+}
+
+Status DBWithTTL::Open(
+ const DBOptions& db_options, const std::string& dbname,
+ const std::vector<ColumnFamilyDescriptor>& column_families,
+ std::vector<ColumnFamilyHandle*>* handles, DBWithTTL** dbptr,
+ const std::vector<int32_t>& ttls, bool read_only) {
+ DBWithTTLImpl::RegisterTtlClasses();
+ if (ttls.size() != column_families.size()) {
+ return Status::InvalidArgument(
+ "ttls size has to be the same as number of column families");
+ }
+
+ SystemClock* clock = (db_options.env == nullptr)
+ ? SystemClock::Default().get()
+ : db_options.env->GetSystemClock().get();
+
+ std::vector<ColumnFamilyDescriptor> column_families_sanitized =
+ column_families;
+ for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
+ DBWithTTLImpl::SanitizeOptions(
+ ttls[i], &column_families_sanitized[i].options, clock);
+ }
+ DB* db;
+
+ Status st;
+ if (read_only) {
+ st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
+ handles, &db);
+ } else {
+ st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
+ }
+ if (st.ok()) {
+ *dbptr = new DBWithTTLImpl(db);
+ } else {
+ *dbptr = nullptr;
+ }
+ return st;
+}
+
+Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
+ const ColumnFamilyOptions& options, const std::string& column_family_name,
+ ColumnFamilyHandle** handle, int ttl) {
+ RegisterTtlClasses();
+ ColumnFamilyOptions sanitized_options = options;
+ DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
+ GetEnv()->GetSystemClock().get());
+
+ return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
+ handle);
+}
+
+Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
+ const std::string& column_family_name,
+ ColumnFamilyHandle** handle) {
+ return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0);
+}
+
+// Appends the current timestamp to the string.
+// Returns false if could not get the current_time, true if append succeeds
+Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
+ SystemClock* clock) {
+ val_with_ts->reserve(kTSLength + val.size());
+ char ts_string[kTSLength];
+ int64_t curtime;
+ Status st = clock->GetCurrentTime(&curtime);
+ if (!st.ok()) {
+ return st;
+ }
+ EncodeFixed32(ts_string, (int32_t)curtime);
+ val_with_ts->append(val.data(), val.size());
+ val_with_ts->append(ts_string, kTSLength);
+ return st;
+}
+
+// Returns corruption if the length of the string is lesser than timestamp, or
+// timestamp refers to a time lesser than ttl-feature release time
+Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
+ if (str.size() < kTSLength) {
+ return Status::Corruption("Error: value's length less than timestamp's\n");
+ }
+ // Checks that TS is not lesser than kMinTimestamp
+ // Gaurds against corruption & normal database opened incorrectly in ttl mode
+ int32_t timestamp_value = DecodeFixed32(str.data() + str.size() - kTSLength);
+ if (timestamp_value < kMinTimestamp) {
+ return Status::Corruption("Error: Timestamp < ttl feature release time!\n");
+ }
+ return Status::OK();
+}
+
+// Checks if the string is stale or not according to TTl provided
+bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl,
+ SystemClock* clock) {
+ if (ttl <= 0) { // Data is fresh if TTL is non-positive
+ return false;
+ }
+ int64_t curtime;
+ if (!clock->GetCurrentTime(&curtime).ok()) {
+ return false; // Treat the data as fresh if could not get current time
+ }
+ int32_t timestamp_value =
+ DecodeFixed32(value.data() + value.size() - kTSLength);
+ return (timestamp_value + ttl) < curtime;
+}
+
+// Strips the TS from the end of the slice
+Status DBWithTTLImpl::StripTS(PinnableSlice* pinnable_val) {
+ if (pinnable_val->size() < kTSLength) {
+ return Status::Corruption("Bad timestamp in key-value");
+ }
+ // Erasing characters which hold the TS
+ pinnable_val->remove_suffix(kTSLength);
+ return Status::OK();
+}
+
+// Strips the TS from the end of the string
+Status DBWithTTLImpl::StripTS(std::string* str) {
+ if (str->length() < kTSLength) {
+ return Status::Corruption("Bad timestamp in key-value");
+ }
+ // Erasing characters which hold the TS
+ str->erase(str->length() - kTSLength, kTSLength);
+ return Status::OK();
+}
+
+Status DBWithTTLImpl::Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& val) {
+ WriteBatch batch;
+ Status st = batch.Put(column_family, key, val);
+ if (st.ok()) {
+ st = Write(options, &batch);
+ }
+ return st;
+}
+
+Status DBWithTTLImpl::Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) {
+ Status st = db_->Get(options, column_family, key, value);
+ if (!st.ok()) {
+ return st;
+ }
+ st = SanityCheckTimestamp(*value);
+ if (!st.ok()) {
+ return st;
+ }
+ return StripTS(value);
+}
+
+std::vector<Status> DBWithTTLImpl::MultiGet(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_family,
+ const std::vector<Slice>& keys, std::vector<std::string>* values) {
+ auto statuses = db_->MultiGet(options, column_family, keys, values);
+ for (size_t i = 0; i < keys.size(); ++i) {
+ if (!statuses[i].ok()) {
+ continue;
+ }
+ statuses[i] = SanityCheckTimestamp((*values)[i]);
+ if (!statuses[i].ok()) {
+ continue;
+ }
+ statuses[i] = StripTS(&(*values)[i]);
+ }
+ return statuses;
+}
+
+bool DBWithTTLImpl::KeyMayExist(const ReadOptions& options,
+ ColumnFamilyHandle* column_family,
+ const Slice& key, std::string* value,
+ bool* value_found) {
+ bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
+ if (ret && value != nullptr && value_found != nullptr && *value_found) {
+ if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
+ return false;
+ }
+ }
+ return ret;
+}
+
+Status DBWithTTLImpl::Merge(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) {
+ WriteBatch batch;
+ Status st = batch.Merge(column_family, key, value);
+ if (st.ok()) {
+ st = Write(options, &batch);
+ }
+ return st;
+}
+
+Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
+ class Handler : public WriteBatch::Handler {
+ public:
+ explicit Handler(SystemClock* clock) : clock_(clock) {}
+ WriteBatch updates_ttl;
+ Status PutCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override {
+ std::string value_with_ts;
+ Status st = AppendTS(value, &value_with_ts, clock_);
+ if (!st.ok()) {
+ return st;
+ }
+ return WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
+ value_with_ts);
+ }
+ Status MergeCF(uint32_t column_family_id, const Slice& key,
+ const Slice& value) override {
+ std::string value_with_ts;
+ Status st = AppendTS(value, &value_with_ts, clock_);
+ if (!st.ok()) {
+ return st;
+ }
+ return WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
+ value_with_ts);
+ }
+ Status DeleteCF(uint32_t column_family_id, const Slice& key) override {
+ return WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
+ }
+ Status DeleteRangeCF(uint32_t column_family_id, const Slice& begin_key,
+ const Slice& end_key) override {
+ return WriteBatchInternal::DeleteRange(&updates_ttl, column_family_id,
+ begin_key, end_key);
+ }
+ void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
+
+ private:
+ SystemClock* clock_;
+ };
+ Handler handler(GetEnv()->GetSystemClock().get());
+ Status st = updates->Iterate(&handler);
+ if (!st.ok()) {
+ return st;
+ } else {
+ return db_->Write(opts, &(handler.updates_ttl));
+ }
+}
+
+Iterator* DBWithTTLImpl::NewIterator(const ReadOptions& opts,
+ ColumnFamilyHandle* column_family) {
+ return new TtlIterator(db_->NewIterator(opts, column_family));
+}
+
+void DBWithTTLImpl::SetTtl(ColumnFamilyHandle* h, int32_t ttl) {
+ std::shared_ptr<TtlCompactionFilterFactory> filter;
+ Options opts;
+ opts = GetOptions(h);
+ filter = std::static_pointer_cast<TtlCompactionFilterFactory>(
+ opts.compaction_filter_factory);
+ if (!filter) return;
+ filter->SetTtl(ttl);
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/ttl/db_ttl_impl.h b/src/rocksdb/utilities/ttl/db_ttl_impl.h
new file mode 100644
index 000000000..dd67a6ddc
--- /dev/null
+++ b/src/rocksdb/utilities/ttl/db_ttl_impl.h
@@ -0,0 +1,245 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#pragma once
+
+#ifndef ROCKSDB_LITE
+#include <deque>
+#include <string>
+#include <vector>
+
+#include "db/db_impl/db_impl.h"
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/db.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/system_clock.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "utilities/compaction_filters/layered_compaction_filter_base.h"
+
+#ifdef _WIN32
+// Windows API macro interference
+#undef GetCurrentTime
+#endif
+
+namespace ROCKSDB_NAMESPACE {
+struct ConfigOptions;
+class ObjectLibrary;
+class ObjectRegistry;
+class DBWithTTLImpl : public DBWithTTL {
+ public:
+ static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
+ SystemClock* clock);
+
+ static void RegisterTtlClasses();
+ explicit DBWithTTLImpl(DB* db);
+
+ virtual ~DBWithTTLImpl();
+
+ virtual Status Close() override;
+
+ Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options,
+ const std::string& column_family_name,
+ ColumnFamilyHandle** handle,
+ int ttl) override;
+
+ Status CreateColumnFamily(const ColumnFamilyOptions& options,
+ const std::string& column_family_name,
+ ColumnFamilyHandle** handle) override;
+
+ using StackableDB::Put;
+ virtual Status Put(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& val) override;
+
+ using StackableDB::Get;
+ virtual Status Get(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ PinnableSlice* value) override;
+
+ using StackableDB::MultiGet;
+ virtual std::vector<Status> MultiGet(
+ const ReadOptions& options,
+ const std::vector<ColumnFamilyHandle*>& column_family,
+ const std::vector<Slice>& keys,
+ std::vector<std::string>* values) override;
+
+ using StackableDB::KeyMayExist;
+ virtual bool KeyMayExist(const ReadOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ std::string* value,
+ bool* value_found = nullptr) override;
+
+ using StackableDB::Merge;
+ virtual Status Merge(const WriteOptions& options,
+ ColumnFamilyHandle* column_family, const Slice& key,
+ const Slice& value) override;
+
+ virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
+
+ using StackableDB::NewIterator;
+ virtual Iterator* NewIterator(const ReadOptions& opts,
+ ColumnFamilyHandle* column_family) override;
+
+ virtual DB* GetBaseDB() override { return db_; }
+
+ static bool IsStale(const Slice& value, int32_t ttl, SystemClock* clock);
+
+ static Status AppendTS(const Slice& val, std::string* val_with_ts,
+ SystemClock* clock);
+
+ static Status SanityCheckTimestamp(const Slice& str);
+
+ static Status StripTS(std::string* str);
+
+ static Status StripTS(PinnableSlice* str);
+
+ static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
+
+ static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
+
+ static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
+
+ void SetTtl(int32_t ttl) override { SetTtl(DefaultColumnFamily(), ttl); }
+
+ void SetTtl(ColumnFamilyHandle* h, int32_t ttl) override;
+
+ private:
+ // remember whether the Close completes or not
+ bool closed_;
+};
+
+class TtlIterator : public Iterator {
+ public:
+ explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); }
+
+ ~TtlIterator() { delete iter_; }
+
+ bool Valid() const override { return iter_->Valid(); }
+
+ void SeekToFirst() override { iter_->SeekToFirst(); }
+
+ void SeekToLast() override { iter_->SeekToLast(); }
+
+ void Seek(const Slice& target) override { iter_->Seek(target); }
+
+ void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); }
+
+ void Next() override { iter_->Next(); }
+
+ void Prev() override { iter_->Prev(); }
+
+ Slice key() const override { return iter_->key(); }
+
+ int32_t ttl_timestamp() const {
+ return DecodeFixed32(iter_->value().data() + iter_->value().size() -
+ DBWithTTLImpl::kTSLength);
+ }
+
+ Slice value() const override {
+ // TODO: handle timestamp corruption like in general iterator semantics
+ assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok());
+ Slice trimmed_value = iter_->value();
+ trimmed_value.size_ -= DBWithTTLImpl::kTSLength;
+ return trimmed_value;
+ }
+
+ Status status() const override { return iter_->status(); }
+
+ private:
+ Iterator* iter_;
+};
+
+class TtlCompactionFilter : public LayeredCompactionFilterBase {
+ public:
+ TtlCompactionFilter(int32_t ttl, SystemClock* clock,
+ const CompactionFilter* _user_comp_filter,
+ std::unique_ptr<const CompactionFilter>
+ _user_comp_filter_from_factory = nullptr);
+
+ virtual bool Filter(int level, const Slice& key, const Slice& old_val,
+ std::string* new_val, bool* value_changed) const override;
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "TtlCompactionFilter"; }
+ bool IsInstanceOf(const std::string& name) const override {
+ if (name == "Delete By TTL") {
+ return true;
+ } else {
+ return LayeredCompactionFilterBase::IsInstanceOf(name);
+ }
+ }
+
+ Status PrepareOptions(const ConfigOptions& config_options) override;
+ Status ValidateOptions(const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts) const override;
+
+ private:
+ int32_t ttl_;
+ SystemClock* clock_;
+};
+
+class TtlCompactionFilterFactory : public CompactionFilterFactory {
+ public:
+ TtlCompactionFilterFactory(
+ int32_t ttl, SystemClock* clock,
+ std::shared_ptr<CompactionFilterFactory> comp_filter_factory);
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& context) override;
+ void SetTtl(int32_t ttl) { ttl_ = ttl; }
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "TtlCompactionFilterFactory"; }
+ Status PrepareOptions(const ConfigOptions& config_options) override;
+ Status ValidateOptions(const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts) const override;
+ const Customizable* Inner() const override {
+ return user_comp_filter_factory_.get();
+ }
+
+ private:
+ int32_t ttl_;
+ SystemClock* clock_;
+ std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
+};
+
+class TtlMergeOperator : public MergeOperator {
+ public:
+ explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
+ SystemClock* clock);
+
+ bool FullMergeV2(const MergeOperationInput& merge_in,
+ MergeOperationOutput* merge_out) const override;
+
+ bool PartialMergeMulti(const Slice& key,
+ const std::deque<Slice>& operand_list,
+ std::string* new_value, Logger* logger) const override;
+
+ static const char* kClassName() { return "TtlMergeOperator"; }
+
+ const char* Name() const override { return kClassName(); }
+ bool IsInstanceOf(const std::string& name) const override {
+ if (name == "Merge By TTL") {
+ return true;
+ } else {
+ return MergeOperator::IsInstanceOf(name);
+ }
+ }
+
+ Status PrepareOptions(const ConfigOptions& config_options) override;
+ Status ValidateOptions(const DBOptions& db_opts,
+ const ColumnFamilyOptions& cf_opts) const override;
+ const Customizable* Inner() const override { return user_merge_op_.get(); }
+
+ private:
+ std::shared_ptr<MergeOperator> user_merge_op_;
+ SystemClock* clock_;
+};
+extern "C" {
+int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/);
+} // extern "C"
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE
diff --git a/src/rocksdb/utilities/ttl/ttl_test.cc b/src/rocksdb/utilities/ttl/ttl_test.cc
new file mode 100644
index 000000000..a42e0acb4
--- /dev/null
+++ b/src/rocksdb/utilities/ttl/ttl_test.cc
@@ -0,0 +1,912 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef ROCKSDB_LITE
+
+#include <map>
+#include <memory>
+
+#include "rocksdb/compaction_filter.h"
+#include "rocksdb/convenience.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/utilities/db_ttl.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "test_util/testharness.h"
+#include "util/string_util.h"
+#include "utilities/merge_operators/bytesxor.h"
+#include "utilities/ttl/db_ttl_impl.h"
+#ifndef OS_WIN
+#include <unistd.h>
+#endif
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+
+using KVMap = std::map<std::string, std::string>;
+
+enum BatchOperation { OP_PUT = 0, OP_DELETE = 1 };
+} // namespace
+
+class SpecialTimeEnv : public EnvWrapper {
+ public:
+ explicit SpecialTimeEnv(Env* base) : EnvWrapper(base) {
+ EXPECT_OK(base->GetCurrentTime(&current_time_));
+ }
+ const char* Name() const override { return "SpecialTimeEnv"; }
+ void Sleep(int64_t sleep_time) { current_time_ += sleep_time; }
+ Status GetCurrentTime(int64_t* current_time) override {
+ *current_time = current_time_;
+ return Status::OK();
+ }
+
+ private:
+ int64_t current_time_ = 0;
+};
+
+class TtlTest : public testing::Test {
+ public:
+ TtlTest() {
+ env_.reset(new SpecialTimeEnv(Env::Default()));
+ dbname_ = test::PerThreadDBPath("db_ttl");
+ options_.create_if_missing = true;
+ options_.env = env_.get();
+ // ensure that compaction is kicked in to always strip timestamp from kvs
+ options_.max_compaction_bytes = 1;
+ // compaction should take place always from level0 for determinism
+ db_ttl_ = nullptr;
+ EXPECT_OK(DestroyDB(dbname_, Options()));
+ }
+
+ ~TtlTest() override {
+ CloseTtl();
+ EXPECT_OK(DestroyDB(dbname_, Options()));
+ }
+
+ // Open database with TTL support when TTL not provided with db_ttl_ pointer
+ void OpenTtl() {
+ ASSERT_TRUE(db_ttl_ ==
+ nullptr); // db should be closed before opening again
+ ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_));
+ }
+
+ // Open database with TTL support when TTL provided with db_ttl_ pointer
+ void OpenTtl(int32_t ttl) {
+ ASSERT_TRUE(db_ttl_ == nullptr);
+ ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl));
+ }
+
+ // Open with TestFilter compaction filter
+ void OpenTtlWithTestCompaction(int32_t ttl) {
+ options_.compaction_filter_factory =
+ std::shared_ptr<CompactionFilterFactory>(
+ new TestFilterFactory(kSampleSize_, kNewValue_));
+ OpenTtl(ttl);
+ }
+
+ // Open database with TTL support in read_only mode
+ void OpenReadOnlyTtl(int32_t ttl) {
+ ASSERT_TRUE(db_ttl_ == nullptr);
+ ASSERT_OK(DBWithTTL::Open(options_, dbname_, &db_ttl_, ttl, true));
+ }
+
+ // Call db_ttl_->Close() before delete db_ttl_
+ void CloseTtl() { CloseTtlHelper(true); }
+
+ // No db_ttl_->Close() before delete db_ttl_
+ void CloseTtlNoDBClose() { CloseTtlHelper(false); }
+
+ void CloseTtlHelper(bool close_db) {
+ if (db_ttl_ != nullptr) {
+ if (close_db) {
+ EXPECT_OK(db_ttl_->Close());
+ }
+ delete db_ttl_;
+ db_ttl_ = nullptr;
+ }
+ }
+
+ // Populates and returns a kv-map
+ void MakeKVMap(int64_t num_entries) {
+ kvmap_.clear();
+ int digits = 1;
+ for (int64_t dummy = num_entries; dummy /= 10; ++digits) {
+ }
+ int digits_in_i = 1;
+ for (int64_t i = 0; i < num_entries; i++) {
+ std::string key = "key";
+ std::string value = "value";
+ if (i % 10 == 0) {
+ digits_in_i++;
+ }
+ for (int j = digits_in_i; j < digits; j++) {
+ key.append("0");
+ value.append("0");
+ }
+ AppendNumberTo(&key, i);
+ AppendNumberTo(&value, i);
+ kvmap_[key] = value;
+ }
+ ASSERT_EQ(static_cast<int64_t>(kvmap_.size()),
+ num_entries); // check all insertions done
+ }
+
+ // Makes a write-batch with key-vals from kvmap_ and 'Write''s it
+ void MakePutWriteBatch(const BatchOperation* batch_ops, int64_t num_ops) {
+ ASSERT_LE(num_ops, static_cast<int64_t>(kvmap_.size()));
+ static WriteOptions wopts;
+ static FlushOptions flush_opts;
+ WriteBatch batch;
+ kv_it_ = kvmap_.begin();
+ for (int64_t i = 0; i < num_ops && kv_it_ != kvmap_.end(); i++, ++kv_it_) {
+ switch (batch_ops[i]) {
+ case OP_PUT:
+ ASSERT_OK(batch.Put(kv_it_->first, kv_it_->second));
+ break;
+ case OP_DELETE:
+ ASSERT_OK(batch.Delete(kv_it_->first));
+ break;
+ default:
+ FAIL();
+ }
+ }
+ ASSERT_OK(db_ttl_->Write(wopts, &batch));
+ ASSERT_OK(db_ttl_->Flush(flush_opts));
+ }
+
+ // Puts num_entries starting from start_pos_map from kvmap_ into the database
+ void PutValues(int64_t start_pos_map, int64_t num_entries, bool flush = true,
+ ColumnFamilyHandle* cf = nullptr) {
+ ASSERT_TRUE(db_ttl_);
+ ASSERT_LE(start_pos_map + num_entries, static_cast<int64_t>(kvmap_.size()));
+ static WriteOptions wopts;
+ static FlushOptions flush_opts;
+ kv_it_ = kvmap_.begin();
+ advance(kv_it_, start_pos_map);
+ for (int64_t i = 0; kv_it_ != kvmap_.end() && i < num_entries;
+ i++, ++kv_it_) {
+ ASSERT_OK(cf == nullptr
+ ? db_ttl_->Put(wopts, kv_it_->first, kv_it_->second)
+ : db_ttl_->Put(wopts, cf, kv_it_->first, kv_it_->second));
+ }
+ // Put a mock kv at the end because CompactionFilter doesn't delete last key
+ ASSERT_OK(cf == nullptr ? db_ttl_->Put(wopts, "keymock", "valuemock")
+ : db_ttl_->Put(wopts, cf, "keymock", "valuemock"));
+ if (flush) {
+ if (cf == nullptr) {
+ ASSERT_OK(db_ttl_->Flush(flush_opts));
+ } else {
+ ASSERT_OK(db_ttl_->Flush(flush_opts, cf));
+ }
+ }
+ }
+
+ // Runs a manual compaction
+ Status ManualCompact(ColumnFamilyHandle* cf = nullptr) {
+ assert(db_ttl_);
+ if (cf == nullptr) {
+ return db_ttl_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
+ } else {
+ return db_ttl_->CompactRange(CompactRangeOptions(), cf, nullptr, nullptr);
+ }
+ }
+
+ // Runs a DeleteRange
+ void MakeDeleteRange(std::string start, std::string end,
+ ColumnFamilyHandle* cf = nullptr) {
+ ASSERT_TRUE(db_ttl_);
+ static WriteOptions wops;
+ WriteBatch wb;
+ ASSERT_OK(cf == nullptr
+ ? wb.DeleteRange(db_ttl_->DefaultColumnFamily(), start, end)
+ : wb.DeleteRange(cf, start, end));
+ ASSERT_OK(db_ttl_->Write(wops, &wb));
+ }
+
+ // checks the whole kvmap_ to return correct values using KeyMayExist
+ void SimpleKeyMayExistCheck() {
+ static ReadOptions ropts;
+ bool value_found;
+ std::string val;
+ for (auto& kv : kvmap_) {
+ bool ret = db_ttl_->KeyMayExist(ropts, kv.first, &val, &value_found);
+ if (ret == false || value_found == false) {
+ fprintf(stderr,
+ "KeyMayExist could not find key=%s in the database but"
+ " should have\n",
+ kv.first.c_str());
+ FAIL();
+ } else if (val.compare(kv.second) != 0) {
+ fprintf(stderr,
+ " value for key=%s present in database is %s but"
+ " should be %s\n",
+ kv.first.c_str(), val.c_str(), kv.second.c_str());
+ FAIL();
+ }
+ }
+ }
+
+ // checks the whole kvmap_ to return correct values using MultiGet
+ void SimpleMultiGetTest() {
+ static ReadOptions ropts;
+ std::vector<Slice> keys;
+ std::vector<std::string> values;
+
+ for (auto& kv : kvmap_) {
+ keys.emplace_back(kv.first);
+ }
+
+ auto statuses = db_ttl_->MultiGet(ropts, keys, &values);
+ size_t i = 0;
+ for (auto& kv : kvmap_) {
+ ASSERT_OK(statuses[i]);
+ ASSERT_EQ(values[i], kv.second);
+ ++i;
+ }
+ }
+
+ void CompactCheck(int64_t st_pos, int64_t span, bool check = true,
+ bool test_compaction_change = false,
+ ColumnFamilyHandle* cf = nullptr) {
+ static ReadOptions ropts;
+ kv_it_ = kvmap_.begin();
+ advance(kv_it_, st_pos);
+ std::string v;
+ for (int64_t i = 0; kv_it_ != kvmap_.end() && i < span; i++, ++kv_it_) {
+ Status s = (cf == nullptr) ? db_ttl_->Get(ropts, kv_it_->first, &v)
+ : db_ttl_->Get(ropts, cf, kv_it_->first, &v);
+ if (s.ok() != check) {
+ fprintf(stderr, "key=%s ", kv_it_->first.c_str());
+ if (!s.ok()) {
+ fprintf(stderr, "is absent from db but was expected to be present\n");
+ } else {
+ fprintf(stderr, "is present in db but was expected to be absent\n");
+ }
+ FAIL();
+ } else if (s.ok()) {
+ if (test_compaction_change && v.compare(kNewValue_) != 0) {
+ fprintf(stderr,
+ " value for key=%s present in database is %s but "
+ " should be %s\n",
+ kv_it_->first.c_str(), v.c_str(), kNewValue_.c_str());
+ FAIL();
+ } else if (!test_compaction_change && v.compare(kv_it_->second) != 0) {
+ fprintf(stderr,
+ " value for key=%s present in database is %s but "
+ " should be %s\n",
+ kv_it_->first.c_str(), v.c_str(), kv_it_->second.c_str());
+ FAIL();
+ }
+ }
+ }
+ }
+ // Sleeps for slp_tim then runs a manual compaction
+ // Checks span starting from st_pos from kvmap_ in the db and
+ // Gets should return true if check is true and false otherwise
+ // Also checks that value that we got is the same as inserted; and =kNewValue
+ // if test_compaction_change is true
+ void SleepCompactCheck(int slp_tim, int64_t st_pos, int64_t span,
+ bool check = true, bool test_compaction_change = false,
+ ColumnFamilyHandle* cf = nullptr) {
+ ASSERT_TRUE(db_ttl_);
+
+ env_->Sleep(slp_tim);
+ ASSERT_OK(ManualCompact(cf));
+ CompactCheck(st_pos, span, check, test_compaction_change, cf);
+ }
+
+ // Similar as SleepCompactCheck but uses TtlIterator to read from db
+ void SleepCompactCheckIter(int slp, int st_pos, int64_t span,
+ bool check = true) {
+ ASSERT_TRUE(db_ttl_);
+ env_->Sleep(slp);
+ ASSERT_OK(ManualCompact());
+ static ReadOptions ropts;
+ Iterator* dbiter = db_ttl_->NewIterator(ropts);
+ kv_it_ = kvmap_.begin();
+ advance(kv_it_, st_pos);
+
+ dbiter->Seek(kv_it_->first);
+ if (!check) {
+ if (dbiter->Valid()) {
+ ASSERT_NE(dbiter->value().compare(kv_it_->second), 0);
+ }
+ } else { // dbiter should have found out kvmap_[st_pos]
+ for (int64_t i = st_pos; kv_it_ != kvmap_.end() && i < st_pos + span;
+ i++, ++kv_it_) {
+ ASSERT_TRUE(dbiter->Valid());
+ ASSERT_EQ(dbiter->value().compare(kv_it_->second), 0);
+ dbiter->Next();
+ }
+ }
+ ASSERT_OK(dbiter->status());
+ delete dbiter;
+ }
+
+ // Set ttl on open db
+ void SetTtl(int32_t ttl, ColumnFamilyHandle* cf = nullptr) {
+ ASSERT_TRUE(db_ttl_);
+ cf == nullptr ? db_ttl_->SetTtl(ttl) : db_ttl_->SetTtl(cf, ttl);
+ }
+
+ class TestFilter : public CompactionFilter {
+ public:
+ TestFilter(const int64_t kSampleSize, const std::string& kNewValue)
+ : kSampleSize_(kSampleSize), kNewValue_(kNewValue) {}
+
+ // Works on keys of the form "key<number>"
+ // Drops key if number at the end of key is in [0, kSampleSize_/3),
+ // Keeps key if it is in [kSampleSize_/3, 2*kSampleSize_/3),
+ // Change value if it is in [2*kSampleSize_/3, kSampleSize_)
+ // Eg. kSampleSize_=6. Drop:key0-1...Keep:key2-3...Change:key4-5...
+ bool Filter(int /*level*/, const Slice& key, const Slice& /*value*/,
+ std::string* new_value, bool* value_changed) const override {
+ assert(new_value != nullptr);
+
+ std::string search_str = "0123456789";
+ std::string key_string = key.ToString();
+ size_t pos = key_string.find_first_of(search_str);
+ int num_key_end;
+ if (pos != std::string::npos) {
+ auto key_substr = key_string.substr(pos, key.size() - pos);
+#ifndef CYGWIN
+ num_key_end = std::stoi(key_substr);
+#else
+ num_key_end = std::strtol(key_substr.c_str(), 0, 10);
+#endif
+
+ } else {
+ return false; // Keep keys not matching the format "key<NUMBER>"
+ }
+
+ int64_t partition = kSampleSize_ / 3;
+ if (num_key_end < partition) {
+ return true;
+ } else if (num_key_end < partition * 2) {
+ return false;
+ } else {
+ *new_value = kNewValue_;
+ *value_changed = true;
+ return false;
+ }
+ }
+
+ const char* Name() const override { return "TestFilter"; }
+
+ private:
+ const int64_t kSampleSize_;
+ const std::string kNewValue_;
+ };
+
+ class TestFilterFactory : public CompactionFilterFactory {
+ public:
+ TestFilterFactory(const int64_t kSampleSize, const std::string& kNewValue)
+ : kSampleSize_(kSampleSize), kNewValue_(kNewValue) {}
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context& /*context*/) override {
+ return std::unique_ptr<CompactionFilter>(
+ new TestFilter(kSampleSize_, kNewValue_));
+ }
+
+ const char* Name() const override { return "TestFilterFactory"; }
+
+ private:
+ const int64_t kSampleSize_;
+ const std::string kNewValue_;
+ };
+
+ // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer
+ static const int64_t kSampleSize_ = 100;
+ std::string dbname_;
+ DBWithTTL* db_ttl_;
+ std::unique_ptr<SpecialTimeEnv> env_;
+
+ private:
+ Options options_;
+ KVMap kvmap_;
+ KVMap::iterator kv_it_;
+ const std::string kNewValue_ = "new_value";
+ std::unique_ptr<CompactionFilter> test_comp_filter_;
+}; // class TtlTest
+
+// If TTL is non positive or not provided, the behaviour is TTL = infinity
+// This test opens the db 3 times with such default behavior and inserts a
+// bunch of kvs each time. All kvs should accumulate in the db till the end
+// Partitions the sample-size provided into 3 sets over boundary1 and boundary2
+TEST_F(TtlTest, NoEffect) {
+ MakeKVMap(kSampleSize_);
+ int64_t boundary1 = kSampleSize_ / 3;
+ int64_t boundary2 = 2 * boundary1;
+
+ OpenTtl();
+ PutValues(0, boundary1); // T=0: Set1 never deleted
+ SleepCompactCheck(1, 0, boundary1); // T=1: Set1 still there
+ CloseTtl();
+
+ OpenTtl(0);
+ PutValues(boundary1, boundary2 - boundary1); // T=1: Set2 never deleted
+ SleepCompactCheck(1, 0, boundary2); // T=2: Sets1 & 2 still there
+ CloseTtl();
+
+ OpenTtl(-1);
+ PutValues(boundary2, kSampleSize_ - boundary2); // T=3: Set3 never deleted
+ SleepCompactCheck(1, 0, kSampleSize_, true); // T=4: Sets 1,2,3 still there
+ CloseTtl();
+}
+
+// Rerun the NoEffect test with a different version of CloseTtl
+// function, where db is directly deleted without close.
+TEST_F(TtlTest, DestructWithoutClose) {
+ MakeKVMap(kSampleSize_);
+ int64_t boundary1 = kSampleSize_ / 3;
+ int64_t boundary2 = 2 * boundary1;
+
+ OpenTtl();
+ PutValues(0, boundary1); // T=0: Set1 never deleted
+ SleepCompactCheck(1, 0, boundary1); // T=1: Set1 still there
+ CloseTtlNoDBClose();
+
+ OpenTtl(0);
+ PutValues(boundary1, boundary2 - boundary1); // T=1: Set2 never deleted
+ SleepCompactCheck(1, 0, boundary2); // T=2: Sets1 & 2 still there
+ CloseTtlNoDBClose();
+
+ OpenTtl(-1);
+ PutValues(boundary2, kSampleSize_ - boundary2); // T=3: Set3 never deleted
+ SleepCompactCheck(1, 0, kSampleSize_, true); // T=4: Sets 1,2,3 still there
+ CloseTtlNoDBClose();
+}
+
+// Puts a set of values and checks its presence using Get during ttl
+TEST_F(TtlTest, PresentDuringTTL) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(2); // T=0:Open the db with ttl = 2
+ PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
+ SleepCompactCheck(1, 0, kSampleSize_,
+ true); // T=1:Set1 should still be there
+ CloseTtl();
+}
+
+// Puts a set of values and checks its absence using Get after ttl
+TEST_F(TtlTest, AbsentAfterTTL) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(1); // T=0:Open the db with ttl = 2
+ PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
+ SleepCompactCheck(2, 0, kSampleSize_, false); // T=2:Set1 should not be there
+ CloseTtl();
+}
+
+// Resets the timestamp of a set of kvs by updating them and checks that they
+// are not deleted according to the old timestamp
+TEST_F(TtlTest, ResetTimestamp) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(3);
+ PutValues(0, kSampleSize_); // T=0: Insert Set1. Delete at t=3
+ env_->Sleep(2); // T=2
+ PutValues(0, kSampleSize_); // T=2: Insert Set1. Delete at t=5
+ SleepCompactCheck(2, 0, kSampleSize_); // T=4: Set1 should still be there
+ CloseTtl();
+}
+
+// Similar to PresentDuringTTL but uses Iterator
+TEST_F(TtlTest, IterPresentDuringTTL) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(2);
+ PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=2
+ SleepCompactCheckIter(1, 0, kSampleSize_); // T=1: Set should be there
+ CloseTtl();
+}
+
+// Similar to AbsentAfterTTL but uses Iterator
+TEST_F(TtlTest, IterAbsentAfterTTL) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(1);
+ PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
+ SleepCompactCheckIter(2, 0, kSampleSize_, false); // T=2: Should not be there
+ CloseTtl();
+}
+
+// Checks presence while opening the same db more than once with the same ttl
+// Note: The second open will open the same db
+TEST_F(TtlTest, MultiOpenSamePresent) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(2);
+ PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=2
+ CloseTtl();
+
+ OpenTtl(2); // T=0. Delete at t=2
+ SleepCompactCheck(1, 0, kSampleSize_); // T=1: Set should be there
+ CloseTtl();
+}
+
+// Checks absence while opening the same db more than once with the same ttl
+// Note: The second open will open the same db
+TEST_F(TtlTest, MultiOpenSameAbsent) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(1);
+ PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
+ CloseTtl();
+
+ OpenTtl(1); // T=0.Delete at t=1
+ SleepCompactCheck(2, 0, kSampleSize_, false); // T=2: Set should not be there
+ CloseTtl();
+}
+
+// Checks presence while opening the same db more than once with bigger ttl
+TEST_F(TtlTest, MultiOpenDifferent) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(1);
+ PutValues(0, kSampleSize_); // T=0: Insert. Delete at t=1
+ CloseTtl();
+
+ OpenTtl(3); // T=0: Set deleted at t=3
+ SleepCompactCheck(2, 0, kSampleSize_); // T=2: Set should be there
+ CloseTtl();
+}
+
+// Checks presence during ttl in read_only mode
+TEST_F(TtlTest, ReadOnlyPresentForever) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(1); // T=0:Open the db normally
+ PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1
+ CloseTtl();
+
+ OpenReadOnlyTtl(1);
+ ASSERT_TRUE(db_ttl_);
+
+ env_->Sleep(2);
+ Status s = ManualCompact(); // T=2:Set1 should still be there
+ ASSERT_TRUE(s.IsNotSupported());
+ CompactCheck(0, kSampleSize_);
+ CloseTtl();
+}
+
+// Checks whether WriteBatch works well with TTL
+// Puts all kvs in kvmap_ in a batch and writes first, then deletes first half
+TEST_F(TtlTest, WriteBatchTest) {
+ MakeKVMap(kSampleSize_);
+ BatchOperation batch_ops[kSampleSize_];
+ for (int i = 0; i < kSampleSize_; i++) {
+ batch_ops[i] = OP_PUT;
+ }
+
+ OpenTtl(2);
+ MakePutWriteBatch(batch_ops, kSampleSize_);
+ for (int i = 0; i < kSampleSize_ / 2; i++) {
+ batch_ops[i] = OP_DELETE;
+ }
+ MakePutWriteBatch(batch_ops, kSampleSize_ / 2);
+ SleepCompactCheck(0, 0, kSampleSize_ / 2, false);
+ SleepCompactCheck(0, kSampleSize_ / 2, kSampleSize_ - kSampleSize_ / 2);
+ CloseTtl();
+}
+
+// Checks user's compaction filter for correctness with TTL logic
+TEST_F(TtlTest, CompactionFilter) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtlWithTestCompaction(1);
+ PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=1
+ // T=2: TTL logic takes precedence over TestFilter:-Set1 should not be there
+ SleepCompactCheck(2, 0, kSampleSize_, false);
+ CloseTtl();
+
+ OpenTtlWithTestCompaction(3);
+ PutValues(0, kSampleSize_); // T=0:Insert Set1.
+ int64_t partition = kSampleSize_ / 3;
+ SleepCompactCheck(1, 0, partition, false); // Part dropped
+ SleepCompactCheck(0, partition, partition); // Part kept
+ SleepCompactCheck(0, 2 * partition, partition, true, true); // Part changed
+ CloseTtl();
+}
+
+// Insert some key-values which KeyMayExist should be able to get and check that
+// values returned are fine
+TEST_F(TtlTest, KeyMayExist) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl();
+ PutValues(0, kSampleSize_, false);
+
+ SimpleKeyMayExistCheck();
+
+ CloseTtl();
+}
+
+TEST_F(TtlTest, MultiGetTest) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl();
+ PutValues(0, kSampleSize_, false);
+
+ SimpleMultiGetTest();
+
+ CloseTtl();
+}
+
+TEST_F(TtlTest, ColumnFamiliesTest) {
+ DB* db;
+ Options options;
+ options.create_if_missing = true;
+ options.env = env_.get();
+
+ DB::Open(options, dbname_, &db);
+ ColumnFamilyHandle* handle;
+ ASSERT_OK(db->CreateColumnFamily(ColumnFamilyOptions(options),
+ "ttl_column_family", &handle));
+
+ delete handle;
+ delete db;
+
+ std::vector<ColumnFamilyDescriptor> column_families;
+ column_families.push_back(ColumnFamilyDescriptor(
+ kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
+ column_families.push_back(ColumnFamilyDescriptor(
+ "ttl_column_family", ColumnFamilyOptions(options)));
+
+ std::vector<ColumnFamilyHandle*> handles;
+
+ ASSERT_OK(DBWithTTL::Open(DBOptions(options), dbname_, column_families,
+ &handles, &db_ttl_, {3, 5}, false));
+ ASSERT_EQ(handles.size(), 2U);
+ ColumnFamilyHandle* new_handle;
+ ASSERT_OK(db_ttl_->CreateColumnFamilyWithTtl(options, "ttl_column_family_2",
+ &new_handle, 2));
+ handles.push_back(new_handle);
+
+ MakeKVMap(kSampleSize_);
+ PutValues(0, kSampleSize_, false, handles[0]);
+ PutValues(0, kSampleSize_, false, handles[1]);
+ PutValues(0, kSampleSize_, false, handles[2]);
+
+ // everything should be there after 1 second
+ SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]);
+ SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
+ SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[2]);
+
+ // only column family 1 should be alive after 4 seconds
+ SleepCompactCheck(3, 0, kSampleSize_, false, false, handles[0]);
+ SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
+ SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]);
+
+ // nothing should be there after 6 seconds
+ SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
+ SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]);
+ SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[2]);
+
+ for (auto h : handles) {
+ delete h;
+ }
+ delete db_ttl_;
+ db_ttl_ = nullptr;
+}
+
+// Puts a set of values and checks its absence using Get after ttl
+TEST_F(TtlTest, ChangeTtlOnOpenDb) {
+ MakeKVMap(kSampleSize_);
+
+ OpenTtl(1); // T=0:Open the db with ttl = 2
+ SetTtl(3);
+ PutValues(0, kSampleSize_); // T=0:Insert Set1. Delete at t=2
+ SleepCompactCheck(2, 0, kSampleSize_, true); // T=2:Set1 should be there
+ CloseTtl();
+}
+
+// Test DeleteRange for DBWithTtl
+TEST_F(TtlTest, DeleteRangeTest) {
+ OpenTtl();
+ ASSERT_OK(db_ttl_->Put(WriteOptions(), "a", "val"));
+ MakeDeleteRange("a", "b");
+ ASSERT_OK(db_ttl_->Put(WriteOptions(), "c", "val"));
+ MakeDeleteRange("b", "d");
+ ASSERT_OK(db_ttl_->Put(WriteOptions(), "e", "val"));
+ MakeDeleteRange("d", "e");
+ // first iteration verifies query correctness in memtable, second verifies
+ // query correctness for a single SST file
+ for (int i = 0; i < 2; i++) {
+ if (i > 0) {
+ ASSERT_OK(db_ttl_->Flush(FlushOptions()));
+ }
+ std::string value;
+ ASSERT_TRUE(db_ttl_->Get(ReadOptions(), "a", &value).IsNotFound());
+ ASSERT_TRUE(db_ttl_->Get(ReadOptions(), "c", &value).IsNotFound());
+ ASSERT_OK(db_ttl_->Get(ReadOptions(), "e", &value));
+ }
+ CloseTtl();
+}
+
+class DummyFilter : public CompactionFilter {
+ public:
+ bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
+ std::string* /*new_value*/,
+ bool* /*value_changed*/) const override {
+ return false;
+ }
+
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "DummyFilter"; }
+};
+
+class DummyFilterFactory : public CompactionFilterFactory {
+ public:
+ const char* Name() const override { return kClassName(); }
+ static const char* kClassName() { return "DummyFilterFactory"; }
+
+ std::unique_ptr<CompactionFilter> CreateCompactionFilter(
+ const CompactionFilter::Context&) override {
+ std::unique_ptr<CompactionFilter> f(new DummyFilter());
+ return f;
+ }
+};
+
+static int RegisterTestObjects(ObjectLibrary& library,
+ const std::string& /*arg*/) {
+ library.AddFactory<CompactionFilter>(
+ "DummyFilter", [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilter>* /*guard*/,
+ std::string* /* errmsg */) {
+ static DummyFilter dummy;
+ return &dummy;
+ });
+ library.AddFactory<CompactionFilterFactory>(
+ "DummyFilterFactory", [](const std::string& /*uri*/,
+ std::unique_ptr<CompactionFilterFactory>* guard,
+ std::string* /* errmsg */) {
+ guard->reset(new DummyFilterFactory());
+ return guard->get();
+ });
+ return 2;
+}
+
+class TtlOptionsTest : public testing::Test {
+ public:
+ TtlOptionsTest() {
+ config_options_.registry->AddLibrary("RegisterTtlObjects",
+ RegisterTtlObjects, "");
+ config_options_.registry->AddLibrary("RegisterTtlTestObjects",
+ RegisterTestObjects, "");
+ }
+ ConfigOptions config_options_;
+};
+
+TEST_F(TtlOptionsTest, LoadTtlCompactionFilter) {
+ const CompactionFilter* filter = nullptr;
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_, TtlCompactionFilter::kClassName(), &filter));
+ ASSERT_NE(filter, nullptr);
+ ASSERT_STREQ(filter->Name(), TtlCompactionFilter::kClassName());
+ auto ttl = filter->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 0);
+ ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ delete filter;
+ filter = nullptr;
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_, "id=TtlCompactionFilter; ttl=123", &filter));
+ ASSERT_NE(filter, nullptr);
+ ttl = filter->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 123);
+ ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ delete filter;
+ filter = nullptr;
+
+ ASSERT_OK(CompactionFilter::CreateFromString(
+ config_options_,
+ "id=TtlCompactionFilter; ttl=456; user_filter=DummyFilter;", &filter));
+ ASSERT_NE(filter, nullptr);
+ auto inner = filter->CheckedCast<DummyFilter>();
+ ASSERT_NE(inner, nullptr);
+ ASSERT_OK(filter->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ std::string mismatch;
+ std::string opts_str = filter->ToString(config_options_);
+ const CompactionFilter* copy = nullptr;
+ ASSERT_OK(
+ CompactionFilter::CreateFromString(config_options_, opts_str, &copy));
+ ASSERT_TRUE(filter->AreEquivalent(config_options_, copy, &mismatch));
+ delete filter;
+ delete copy;
+}
+
+TEST_F(TtlOptionsTest, LoadTtlCompactionFilterFactory) {
+ std::shared_ptr<CompactionFilterFactory> cff;
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options_, TtlCompactionFilterFactory::kClassName(), &cff));
+ ASSERT_NE(cff.get(), nullptr);
+ ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
+ auto ttl = cff->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 0);
+ ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options_, "id=TtlCompactionFilterFactory; ttl=123", &cff));
+ ASSERT_NE(cff.get(), nullptr);
+ ASSERT_STREQ(cff->Name(), TtlCompactionFilterFactory::kClassName());
+ ttl = cff->GetOptions<int32_t>("TTL");
+ ASSERT_NE(ttl, nullptr);
+ ASSERT_EQ(*ttl, 123);
+ ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(
+ config_options_,
+ "id=TtlCompactionFilterFactory; ttl=456; "
+ "user_filter_factory=DummyFilterFactory;",
+ &cff));
+ ASSERT_NE(cff.get(), nullptr);
+ auto filter = cff->CreateCompactionFilter(CompactionFilter::Context());
+ ASSERT_NE(filter.get(), nullptr);
+ auto ttlf = filter->CheckedCast<TtlCompactionFilter>();
+ ASSERT_EQ(filter.get(), ttlf);
+ auto user = filter->CheckedCast<DummyFilter>();
+ ASSERT_NE(user, nullptr);
+ ASSERT_OK(cff->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ std::string opts_str = cff->ToString(config_options_);
+ std::string mismatch;
+ std::shared_ptr<CompactionFilterFactory> copy;
+ ASSERT_OK(CompactionFilterFactory::CreateFromString(config_options_, opts_str,
+ &copy));
+ ASSERT_TRUE(cff->AreEquivalent(config_options_, copy.get(), &mismatch));
+}
+
+TEST_F(TtlOptionsTest, LoadTtlMergeOperator) {
+ std::shared_ptr<MergeOperator> mo;
+
+ config_options_.invoke_prepare_options = false;
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options_, TtlMergeOperator::kClassName(), &mo));
+ ASSERT_NE(mo.get(), nullptr);
+ ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
+ ASSERT_NOK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+
+ config_options_.invoke_prepare_options = true;
+ ASSERT_OK(MergeOperator::CreateFromString(
+ config_options_, "id=TtlMergeOperator; user_operator=bytesxor", &mo));
+ ASSERT_NE(mo.get(), nullptr);
+ ASSERT_STREQ(mo->Name(), TtlMergeOperator::kClassName());
+ ASSERT_OK(mo->ValidateOptions(DBOptions(), ColumnFamilyOptions()));
+ auto ttl_mo = mo->CheckedCast<TtlMergeOperator>();
+ ASSERT_EQ(mo.get(), ttl_mo);
+ auto user = ttl_mo->CheckedCast<BytesXOROperator>();
+ ASSERT_NE(user, nullptr);
+
+ std::string mismatch;
+ std::string opts_str = mo->ToString(config_options_);
+ std::shared_ptr<MergeOperator> copy;
+ ASSERT_OK(MergeOperator::CreateFromString(config_options_, opts_str, &copy));
+ ASSERT_TRUE(mo->AreEquivalent(config_options_, copy.get(), &mismatch));
+}
+} // namespace ROCKSDB_NAMESPACE
+
+// A black-box test for the ttl wrapper around rocksdb
+int main(int argc, char** argv) {
+ ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
+
+#else
+#include <stdio.h>
+
+int main(int /*argc*/, char** /*argv*/) {
+ fprintf(stderr, "SKIPPED as DBWithTTL is not supported in ROCKSDB_LITE\n");
+ return 0;
+}
+
+#endif // !ROCKSDB_LITE