summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/table/get_context.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/rocksdb/table/get_context.cc')
-rw-r--r--src/rocksdb/table/get_context.cc604
1 files changed, 604 insertions, 0 deletions
diff --git a/src/rocksdb/table/get_context.cc b/src/rocksdb/table/get_context.cc
new file mode 100644
index 000000000..69e752714
--- /dev/null
+++ b/src/rocksdb/table/get_context.cc
@@ -0,0 +1,604 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "table/get_context.h"
+
+#include "db/blob//blob_fetcher.h"
+#include "db/merge_helper.h"
+#include "db/pinned_iterators_manager.h"
+#include "db/read_callback.h"
+#include "db/wide/wide_column_serialization.h"
+#include "monitoring/file_read_sample.h"
+#include "monitoring/perf_context_imp.h"
+#include "monitoring/statistics.h"
+#include "rocksdb/merge_operator.h"
+#include "rocksdb/statistics.h"
+#include "rocksdb/system_clock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+namespace {
+
+void appendToReplayLog(std::string* replay_log, ValueType type, Slice value) {
+#ifndef ROCKSDB_LITE
+ if (replay_log) {
+ if (replay_log->empty()) {
+ // Optimization: in the common case of only one operation in the
+ // log, we allocate the exact amount of space needed.
+ replay_log->reserve(1 + VarintLength(value.size()) + value.size());
+ }
+ replay_log->push_back(type);
+ PutLengthPrefixedSlice(replay_log, value);
+ }
+#else
+ (void)replay_log;
+ (void)type;
+ (void)value;
+#endif // ROCKSDB_LITE
+}
+
+} // namespace
+
+GetContext::GetContext(
+ const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
+ Statistics* statistics, GetState init_state, const Slice& user_key,
+ PinnableSlice* pinnable_val, PinnableWideColumns* columns,
+ std::string* timestamp, bool* value_found, MergeContext* merge_context,
+ bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
+ SystemClock* clock, SequenceNumber* seq,
+ PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
+ bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
+ : ucmp_(ucmp),
+ merge_operator_(merge_operator),
+ logger_(logger),
+ statistics_(statistics),
+ state_(init_state),
+ user_key_(user_key),
+ pinnable_val_(pinnable_val),
+ columns_(columns),
+ timestamp_(timestamp),
+ value_found_(value_found),
+ merge_context_(merge_context),
+ max_covering_tombstone_seq_(_max_covering_tombstone_seq),
+ clock_(clock),
+ seq_(seq),
+ replay_log_(nullptr),
+ pinned_iters_mgr_(_pinned_iters_mgr),
+ callback_(callback),
+ do_merge_(do_merge),
+ is_blob_index_(is_blob_index),
+ tracing_get_id_(tracing_get_id),
+ blob_fetcher_(blob_fetcher) {
+ if (seq_) {
+ *seq_ = kMaxSequenceNumber;
+ }
+ sample_ = should_sample_file_read();
+}
+
+GetContext::GetContext(const Comparator* ucmp,
+ const MergeOperator* merge_operator, Logger* logger,
+ Statistics* statistics, GetState init_state,
+ const Slice& user_key, PinnableSlice* pinnable_val,
+ PinnableWideColumns* columns, bool* value_found,
+ MergeContext* merge_context, bool do_merge,
+ SequenceNumber* _max_covering_tombstone_seq,
+ SystemClock* clock, SequenceNumber* seq,
+ PinnedIteratorsManager* _pinned_iters_mgr,
+ ReadCallback* callback, bool* is_blob_index,
+ uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
+ : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
+ pinnable_val, columns, /*timestamp=*/nullptr, value_found,
+ merge_context, do_merge, _max_covering_tombstone_seq, clock,
+ seq, _pinned_iters_mgr, callback, is_blob_index,
+ tracing_get_id, blob_fetcher) {}
+
+// Called from TableCache::Get and Table::Get when file/block in which
+// key may exist are not there in TableCache/BlockCache respectively. In this
+// case we can't guarantee that key does not exist and are not permitted to do
+// IO to be certain.Set the status=kFound and value_found=false to let the
+// caller know that key may exist but is not there in memory
+void GetContext::MarkKeyMayExist() {
+ state_ = kFound;
+ if (value_found_ != nullptr) {
+ *value_found_ = false;
+ }
+}
+
+void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
+ assert(state_ == kNotFound);
+ appendToReplayLog(replay_log_, kTypeValue, value);
+
+ state_ = kFound;
+ if (LIKELY(pinnable_val_ != nullptr)) {
+ pinnable_val_->PinSelf(value);
+ }
+}
+
+void GetContext::ReportCounters() {
+ if (get_context_stats_.num_cache_hit > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
+ }
+ if (get_context_stats_.num_cache_index_hit > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
+ get_context_stats_.num_cache_index_hit);
+ }
+ if (get_context_stats_.num_cache_data_hit > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
+ get_context_stats_.num_cache_data_hit);
+ }
+ if (get_context_stats_.num_cache_filter_hit > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
+ get_context_stats_.num_cache_filter_hit);
+ }
+ if (get_context_stats_.num_cache_compression_dict_hit > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
+ get_context_stats_.num_cache_compression_dict_hit);
+ }
+ if (get_context_stats_.num_cache_index_miss > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
+ get_context_stats_.num_cache_index_miss);
+ }
+ if (get_context_stats_.num_cache_filter_miss > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
+ get_context_stats_.num_cache_filter_miss);
+ }
+ if (get_context_stats_.num_cache_data_miss > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
+ get_context_stats_.num_cache_data_miss);
+ }
+ if (get_context_stats_.num_cache_compression_dict_miss > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
+ get_context_stats_.num_cache_compression_dict_miss);
+ }
+ if (get_context_stats_.num_cache_bytes_read > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
+ get_context_stats_.num_cache_bytes_read);
+ }
+ if (get_context_stats_.num_cache_miss > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_MISS,
+ get_context_stats_.num_cache_miss);
+ }
+ if (get_context_stats_.num_cache_add > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
+ }
+ if (get_context_stats_.num_cache_add_redundant > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
+ get_context_stats_.num_cache_add_redundant);
+ }
+ if (get_context_stats_.num_cache_bytes_write > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
+ get_context_stats_.num_cache_bytes_write);
+ }
+ if (get_context_stats_.num_cache_index_add > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
+ get_context_stats_.num_cache_index_add);
+ }
+ if (get_context_stats_.num_cache_index_add_redundant > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
+ get_context_stats_.num_cache_index_add_redundant);
+ }
+ if (get_context_stats_.num_cache_index_bytes_insert > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
+ get_context_stats_.num_cache_index_bytes_insert);
+ }
+ if (get_context_stats_.num_cache_data_add > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
+ get_context_stats_.num_cache_data_add);
+ }
+ if (get_context_stats_.num_cache_data_add_redundant > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
+ get_context_stats_.num_cache_data_add_redundant);
+ }
+ if (get_context_stats_.num_cache_data_bytes_insert > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
+ get_context_stats_.num_cache_data_bytes_insert);
+ }
+ if (get_context_stats_.num_cache_filter_add > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
+ get_context_stats_.num_cache_filter_add);
+ }
+ if (get_context_stats_.num_cache_filter_add_redundant > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
+ get_context_stats_.num_cache_filter_add_redundant);
+ }
+ if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
+ get_context_stats_.num_cache_filter_bytes_insert);
+ }
+ if (get_context_stats_.num_cache_compression_dict_add > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
+ get_context_stats_.num_cache_compression_dict_add);
+ }
+ if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
+ get_context_stats_.num_cache_compression_dict_add_redundant);
+ }
+ if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
+ RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
+ get_context_stats_.num_cache_compression_dict_bytes_insert);
+ }
+}
+
+bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
+ const Slice& value, bool* matched,
+ Cleanable* value_pinner) {
+ assert(matched);
+ assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
+ merge_context_ != nullptr);
+ if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
+ *matched = true;
+ // If the value is not in the snapshot, skip it
+ if (!CheckCallback(parsed_key.sequence)) {
+ return true; // to continue to the next seq
+ }
+
+ appendToReplayLog(replay_log_, parsed_key.type, value);
+
+ if (seq_ != nullptr) {
+ // Set the sequence number if it is uninitialized
+ if (*seq_ == kMaxSequenceNumber) {
+ *seq_ = parsed_key.sequence;
+ }
+ if (max_covering_tombstone_seq_) {
+ *seq_ = std::max(*seq_, *max_covering_tombstone_seq_);
+ }
+ }
+
+ size_t ts_sz = ucmp_->timestamp_size();
+ if (ts_sz > 0 && timestamp_ != nullptr) {
+ if (!timestamp_->empty()) {
+ assert(ts_sz == timestamp_->size());
+ // `timestamp` can be set before `SaveValue` is ever called
+ // when max_covering_tombstone_seq_ was set.
+ // If this key has a higher sequence number than range tombstone,
+ // then timestamp should be updated. `ts_from_rangetombstone_` is
+ // set to false afterwards so that only the key with highest seqno
+ // updates the timestamp.
+ if (ts_from_rangetombstone_) {
+ assert(max_covering_tombstone_seq_);
+ if (parsed_key.sequence > *max_covering_tombstone_seq_) {
+ Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
+ timestamp_->assign(ts.data(), ts.size());
+ ts_from_rangetombstone_ = false;
+ }
+ }
+ }
+ // TODO optimize for small size ts
+ const std::string kMaxTs(ts_sz, '\xff');
+ if (timestamp_->empty() ||
+ ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
+ Slice ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
+ timestamp_->assign(ts.data(), ts.size());
+ }
+ }
+
+ auto type = parsed_key.type;
+ // Key matches. Process it
+ if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
+ type == kTypeWideColumnEntity || type == kTypeDeletion ||
+ type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) &&
+ max_covering_tombstone_seq_ != nullptr &&
+ *max_covering_tombstone_seq_ > parsed_key.sequence) {
+ // Note that deletion types are also considered, this is for the case
+ // when we need to return timestamp to user. If a range tombstone has a
+ // higher seqno than point tombstone, its timestamp should be returned.
+ type = kTypeRangeDeletion;
+ }
+ switch (type) {
+ case kTypeValue:
+ case kTypeBlobIndex:
+ case kTypeWideColumnEntity:
+ assert(state_ == kNotFound || state_ == kMerge);
+ if (type == kTypeBlobIndex) {
+ if (is_blob_index_ == nullptr) {
+ // Blob value not supported. Stop.
+ state_ = kUnexpectedBlobIndex;
+ return false;
+ }
+ }
+
+ if (is_blob_index_ != nullptr) {
+ *is_blob_index_ = (type == kTypeBlobIndex);
+ }
+
+ if (kNotFound == state_) {
+ state_ = kFound;
+ if (do_merge_) {
+ if (LIKELY(pinnable_val_ != nullptr)) {
+ Slice value_to_use = value;
+
+ if (type == kTypeWideColumnEntity) {
+ Slice value_copy = value;
+
+ if (!WideColumnSerialization::GetValueOfDefaultColumn(
+ value_copy, value_to_use)
+ .ok()) {
+ state_ = kCorrupt;
+ return false;
+ }
+ }
+
+ if (LIKELY(value_pinner != nullptr)) {
+ // If the backing resources for the value are provided, pin them
+ pinnable_val_->PinSlice(value_to_use, value_pinner);
+ } else {
+ TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
+ this);
+ // Otherwise copy the value
+ pinnable_val_->PinSelf(value_to_use);
+ }
+ } else if (columns_ != nullptr) {
+ if (type == kTypeWideColumnEntity) {
+ if (!columns_->SetWideColumnValue(value, value_pinner).ok()) {
+ state_ = kCorrupt;
+ return false;
+ }
+ } else {
+ columns_->SetPlainValue(value, value_pinner);
+ }
+ }
+ } else {
+ // It means this function is called as part of DB GetMergeOperands
+ // API and the current value should be part of
+ // merge_context_->operand_list
+ if (type == kTypeBlobIndex) {
+ PinnableSlice pin_val;
+ if (GetBlobValue(value, &pin_val) == false) {
+ return false;
+ }
+ Slice blob_value(pin_val);
+ push_operand(blob_value, nullptr);
+ } else if (type == kTypeWideColumnEntity) {
+ Slice value_copy = value;
+ Slice value_of_default;
+
+ if (!WideColumnSerialization::GetValueOfDefaultColumn(
+ value_copy, value_of_default)
+ .ok()) {
+ state_ = kCorrupt;
+ return false;
+ }
+
+ push_operand(value_of_default, value_pinner);
+ } else {
+ assert(type == kTypeValue);
+ push_operand(value, value_pinner);
+ }
+ }
+ } else if (kMerge == state_) {
+ assert(merge_operator_ != nullptr);
+ if (type == kTypeBlobIndex) {
+ PinnableSlice pin_val;
+ if (GetBlobValue(value, &pin_val) == false) {
+ return false;
+ }
+ Slice blob_value(pin_val);
+ state_ = kFound;
+ if (do_merge_) {
+ Merge(&blob_value);
+ } else {
+ // It means this function is called as part of DB GetMergeOperands
+ // API and the current value should be part of
+ // merge_context_->operand_list
+ push_operand(blob_value, nullptr);
+ }
+ } else if (type == kTypeWideColumnEntity) {
+ state_ = kFound;
+
+ if (do_merge_) {
+ MergeWithEntity(value);
+ } else {
+ // It means this function is called as part of DB GetMergeOperands
+ // API and the current value should be part of
+ // merge_context_->operand_list
+ Slice value_copy = value;
+ Slice value_of_default;
+
+ if (!WideColumnSerialization::GetValueOfDefaultColumn(
+ value_copy, value_of_default)
+ .ok()) {
+ state_ = kCorrupt;
+ return false;
+ }
+
+ push_operand(value_of_default, value_pinner);
+ }
+ } else {
+ assert(type == kTypeValue);
+
+ state_ = kFound;
+ if (do_merge_) {
+ Merge(&value);
+ } else {
+ // It means this function is called as part of DB GetMergeOperands
+ // API and the current value should be part of
+ // merge_context_->operand_list
+ push_operand(value, value_pinner);
+ }
+ }
+ }
+ return false;
+
+ case kTypeDeletion:
+ case kTypeDeletionWithTimestamp:
+ case kTypeSingleDeletion:
+ case kTypeRangeDeletion:
+ // TODO(noetzli): Verify correctness once merge of single-deletes
+ // is supported
+ assert(state_ == kNotFound || state_ == kMerge);
+ if (kNotFound == state_) {
+ state_ = kDeleted;
+ } else if (kMerge == state_) {
+ state_ = kFound;
+ if (do_merge_) {
+ Merge(nullptr);
+ }
+ // If do_merge_ = false then the current value shouldn't be part of
+ // merge_context_->operand_list
+ }
+ return false;
+
+ case kTypeMerge:
+ assert(state_ == kNotFound || state_ == kMerge);
+ state_ = kMerge;
+ // value_pinner is not set from plain_table_reader.cc for example.
+ push_operand(value, value_pinner);
+ if (do_merge_ && merge_operator_ != nullptr &&
+ merge_operator_->ShouldMerge(
+ merge_context_->GetOperandsDirectionBackward())) {
+ state_ = kFound;
+ Merge(nullptr);
+ return false;
+ }
+ return true;
+
+ default:
+ assert(false);
+ break;
+ }
+ }
+
+ // state_ could be Corrupt, merge or notfound
+ return false;
+}
+
+void GetContext::Merge(const Slice* value) {
+ assert(do_merge_);
+ assert(!pinnable_val_ || !columns_);
+
+ std::string result;
+ const Status s = MergeHelper::TimedFullMerge(
+ merge_operator_, user_key_, value, merge_context_->GetOperands(), &result,
+ logger_, statistics_, clock_, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
+ if (!s.ok()) {
+ state_ = kCorrupt;
+ return;
+ }
+
+ if (LIKELY(pinnable_val_ != nullptr)) {
+ *(pinnable_val_->GetSelf()) = std::move(result);
+ pinnable_val_->PinSelf();
+ return;
+ }
+
+ assert(columns_);
+ columns_->SetPlainValue(result);
+}
+
+void GetContext::MergeWithEntity(Slice entity) {
+ assert(do_merge_);
+ assert(!pinnable_val_ || !columns_);
+
+ if (LIKELY(pinnable_val_ != nullptr)) {
+ Slice value_of_default;
+
+ {
+ const Status s = WideColumnSerialization::GetValueOfDefaultColumn(
+ entity, value_of_default);
+ if (!s.ok()) {
+ state_ = kCorrupt;
+ return;
+ }
+ }
+
+ {
+ const Status s = MergeHelper::TimedFullMerge(
+ merge_operator_, user_key_, &value_of_default,
+ merge_context_->GetOperands(), pinnable_val_->GetSelf(), logger_,
+ statistics_, clock_, /* result_operand */ nullptr,
+ /* update_num_ops_stats */ true);
+ if (!s.ok()) {
+ state_ = kCorrupt;
+ return;
+ }
+ }
+
+ pinnable_val_->PinSelf();
+ return;
+ }
+
+ std::string result;
+
+ {
+ const Status s = MergeHelper::TimedFullMergeWithEntity(
+ merge_operator_, user_key_, entity, merge_context_->GetOperands(),
+ &result, logger_, statistics_, clock_, /* update_num_ops_stats */ true);
+ if (!s.ok()) {
+ state_ = kCorrupt;
+ return;
+ }
+ }
+
+ {
+ assert(columns_);
+ const Status s = columns_->SetWideColumnValue(result);
+ if (!s.ok()) {
+ state_ = kCorrupt;
+ return;
+ }
+ }
+}
+
+bool GetContext::GetBlobValue(const Slice& blob_index,
+ PinnableSlice* blob_value) {
+ constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
+ constexpr uint64_t* bytes_read = nullptr;
+
+ Status status = blob_fetcher_->FetchBlob(
+ user_key_, blob_index, prefetch_buffer, blob_value, bytes_read);
+ if (!status.ok()) {
+ if (status.IsIncomplete()) {
+ // FIXME: this code is not covered by unit tests
+ MarkKeyMayExist();
+ return false;
+ }
+ state_ = kCorrupt;
+ return false;
+ }
+ *is_blob_index_ = false;
+ return true;
+}
+
+void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
+ // TODO(yanqin) preserve timestamps information in merge_context
+ if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
+ value_pinner != nullptr) {
+ value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
+ merge_context_->PushOperand(value, true /*value_pinned*/);
+ } else {
+ merge_context_->PushOperand(value, false);
+ }
+}
+
+void replayGetContextLog(const Slice& replay_log, const Slice& user_key,
+ GetContext* get_context, Cleanable* value_pinner) {
+#ifndef ROCKSDB_LITE
+ Slice s = replay_log;
+ while (s.size()) {
+ auto type = static_cast<ValueType>(*s.data());
+ s.remove_prefix(1);
+ Slice value;
+ bool ret = GetLengthPrefixedSlice(&s, &value);
+ assert(ret);
+ (void)ret;
+
+ bool dont_care __attribute__((__unused__));
+ // Since SequenceNumber is not stored and unknown, we will use
+ // kMaxSequenceNumber.
+ get_context->SaveValue(
+ ParsedInternalKey(user_key, kMaxSequenceNumber, type), value,
+ &dont_care, value_pinner);
+ }
+#else // ROCKSDB_LITE
+ (void)replay_log;
+ (void)user_key;
+ (void)get_context;
+ (void)value_pinner;
+ assert(false);
+#endif // ROCKSDB_LITE
+}
+
+} // namespace ROCKSDB_NAMESPACE