summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/utilities/cache_dump_load_impl.cc
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-21 11:54:28 +0000
commite6918187568dbd01842d8d1d2c808ce16a894239 (patch)
tree64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/cache_dump_load_impl.cc
parentInitial commit. (diff)
downloadceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz
ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/cache_dump_load_impl.cc')
-rw-r--r--src/rocksdb/utilities/cache_dump_load_impl.cc393
1 files changed, 393 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/cache_dump_load_impl.cc b/src/rocksdb/utilities/cache_dump_load_impl.cc
new file mode 100644
index 000000000..2b9f2a29d
--- /dev/null
+++ b/src/rocksdb/utilities/cache_dump_load_impl.cc
@@ -0,0 +1,393 @@
+// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#include "cache/cache_key.h"
+#include "table/block_based/block_based_table_reader.h"
+#ifndef ROCKSDB_LITE
+
+#include "cache/cache_entry_roles.h"
+#include "file/writable_file_writer.h"
+#include "port/lang.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/utilities/ldb_cmd.h"
+#include "table/format.h"
+#include "util/crc32c.h"
+#include "utilities/cache_dump_load_impl.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+// Set the dump filter with a list of DBs. Block cache may be shared by multipe
+// DBs and we may only want to dump out the blocks belonging to certain DB(s).
+// Therefore, a filter is need to decide if the key of the block satisfy the
+// requirement.
+Status CacheDumperImpl::SetDumpFilter(std::vector<DB*> db_list) {
+ Status s = Status::OK();
+ for (size_t i = 0; i < db_list.size(); i++) {
+ assert(i < db_list.size());
+ TablePropertiesCollection ptc;
+ assert(db_list[i] != nullptr);
+ s = db_list[i]->GetPropertiesOfAllTables(&ptc);
+ if (!s.ok()) {
+ return s;
+ }
+ for (auto id = ptc.begin(); id != ptc.end(); id++) {
+ OffsetableCacheKey base;
+ // We only want to save cache entries that are portable to another
+ // DB::Open, so only save entries with stable keys.
+ bool is_stable;
+ BlockBasedTable::SetupBaseCacheKey(id->second.get(),
+ /*cur_db_session_id*/ "",
+ /*cur_file_num*/ 0, &base, &is_stable);
+ if (is_stable) {
+ Slice prefix_slice = base.CommonPrefixSlice();
+ assert(prefix_slice.size() == OffsetableCacheKey::kCommonPrefixSize);
+ prefix_filter_.insert(prefix_slice.ToString());
+ }
+ }
+ }
+ return s;
+}
+
+// This is the main function to dump out the cache block entries to the writer.
+// The writer may create a file or write to other systems. Currently, we will
+// iterate the whole block cache, get the blocks, and write them to the writer
+IOStatus CacheDumperImpl::DumpCacheEntriesToWriter() {
+ // Prepare stage, check the parameters.
+ if (cache_ == nullptr) {
+ return IOStatus::InvalidArgument("Cache is null");
+ }
+ if (writer_ == nullptr) {
+ return IOStatus::InvalidArgument("CacheDumpWriter is null");
+ }
+ // Set the system clock
+ if (options_.clock == nullptr) {
+ return IOStatus::InvalidArgument("System clock is null");
+ }
+ clock_ = options_.clock;
+ // We copy the Cache Deleter Role Map as its member.
+ role_map_ = CopyCacheDeleterRoleMap();
+ // Set the sequence number
+ sequence_num_ = 0;
+
+ // Dump stage, first, we write the hader
+ IOStatus io_s = WriteHeader();
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ // Then, we iterate the block cache and dump out the blocks that are not
+ // filtered out.
+ cache_->ApplyToAllEntries(DumpOneBlockCallBack(), {});
+
+ // Finally, write the footer
+ io_s = WriteFooter();
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ io_s = writer_->Close();
+ return io_s;
+}
+
+// Check if we need to filter out the block based on its key
+bool CacheDumperImpl::ShouldFilterOut(const Slice& key) {
+ if (key.size() < OffsetableCacheKey::kCommonPrefixSize) {
+ return /*filter out*/ true;
+ }
+ Slice key_prefix(key.data(), OffsetableCacheKey::kCommonPrefixSize);
+ std::string prefix = key_prefix.ToString();
+ // Filter out if not found
+ return prefix_filter_.find(prefix) == prefix_filter_.end();
+}
+
+// This is the callback function which will be applied to
+// Cache::ApplyToAllEntries. In this callback function, we will get the block
+// type, decide if the block needs to be dumped based on the filter, and write
+// the block through the provided writer.
+std::function<void(const Slice&, void*, size_t, Cache::DeleterFn)>
+CacheDumperImpl::DumpOneBlockCallBack() {
+ return [&](const Slice& key, void* value, size_t /*charge*/,
+ Cache::DeleterFn deleter) {
+ // Step 1: get the type of the block from role_map_
+ auto e = role_map_.find(deleter);
+ CacheEntryRole role;
+ CacheDumpUnitType type = CacheDumpUnitType::kBlockTypeMax;
+ if (e == role_map_.end()) {
+ role = CacheEntryRole::kMisc;
+ } else {
+ role = e->second;
+ }
+ bool filter_out = false;
+
+ // Step 2: based on the key prefix, check if the block should be filter out.
+ if (ShouldFilterOut(key)) {
+ filter_out = true;
+ }
+
+ // Step 3: based on the block type, get the block raw pointer and length.
+ const char* block_start = nullptr;
+ size_t block_len = 0;
+ switch (role) {
+ case CacheEntryRole::kDataBlock:
+ type = CacheDumpUnitType::kData;
+ block_start = (static_cast<Block*>(value))->data();
+ block_len = (static_cast<Block*>(value))->size();
+ break;
+ case CacheEntryRole::kFilterBlock:
+ type = CacheDumpUnitType::kFilter;
+ block_start = (static_cast<ParsedFullFilterBlock*>(value))
+ ->GetBlockContentsData()
+ .data();
+ block_len = (static_cast<ParsedFullFilterBlock*>(value))
+ ->GetBlockContentsData()
+ .size();
+ break;
+ case CacheEntryRole::kFilterMetaBlock:
+ type = CacheDumpUnitType::kFilterMetaBlock;
+ block_start = (static_cast<Block*>(value))->data();
+ block_len = (static_cast<Block*>(value))->size();
+ break;
+ case CacheEntryRole::kIndexBlock:
+ type = CacheDumpUnitType::kIndex;
+ block_start = (static_cast<Block*>(value))->data();
+ block_len = (static_cast<Block*>(value))->size();
+ break;
+ case CacheEntryRole::kDeprecatedFilterBlock:
+ // Obsolete
+ filter_out = true;
+ break;
+ case CacheEntryRole::kMisc:
+ filter_out = true;
+ break;
+ case CacheEntryRole::kOtherBlock:
+ filter_out = true;
+ break;
+ case CacheEntryRole::kWriteBuffer:
+ filter_out = true;
+ break;
+ default:
+ filter_out = true;
+ }
+
+ // Step 4: if the block should not be filter out, write the block to the
+ // CacheDumpWriter
+ if (!filter_out && block_start != nullptr) {
+ WriteBlock(type, key, Slice(block_start, block_len))
+ .PermitUncheckedError();
+ }
+ };
+}
+
+// Write the block to the writer. It takes the timestamp of the
+// block being copied from block cache, block type, key, block pointer,
+// block size and block checksum as the input. When writing the dumper raw
+// block, we first create the dump unit and encoude it to a string. Then,
+// we calculate the checksum of the whole dump unit string and store it in
+// the dump unit metadata.
+// First, we write the metadata first, which is a fixed size string. Then, we
+// Append the dump unit string to the writer.
+IOStatus CacheDumperImpl::WriteBlock(CacheDumpUnitType type, const Slice& key,
+ const Slice& value) {
+ uint64_t timestamp = clock_->NowMicros();
+ uint32_t value_checksum = crc32c::Value(value.data(), value.size());
+
+ // First, serialize the block information in a string
+ DumpUnit dump_unit;
+ dump_unit.timestamp = timestamp;
+ dump_unit.key = key;
+ dump_unit.type = type;
+ dump_unit.value_len = value.size();
+ dump_unit.value = const_cast<char*>(value.data());
+ dump_unit.value_checksum = value_checksum;
+ std::string encoded_data;
+ CacheDumperHelper::EncodeDumpUnit(dump_unit, &encoded_data);
+
+ // Second, create the metadata, which contains a sequence number, the dump
+ // unit string checksum and the string size. The sequence number monotonically
+ // increases from 0.
+ DumpUnitMeta unit_meta;
+ unit_meta.sequence_num = sequence_num_;
+ sequence_num_++;
+ unit_meta.dump_unit_checksum =
+ crc32c::Value(encoded_data.data(), encoded_data.size());
+ unit_meta.dump_unit_size = encoded_data.size();
+ std::string encoded_meta;
+ CacheDumperHelper::EncodeDumpUnitMeta(unit_meta, &encoded_meta);
+
+ // We write the metadata first.
+ assert(writer_ != nullptr);
+ IOStatus io_s = writer_->WriteMetadata(encoded_meta);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ // followed by the dump unit.
+ return writer_->WritePacket(encoded_data);
+}
+
+// Before we write any block, we write the header first to store the cache dump
+// format version, rocksdb version, and brief intro.
+IOStatus CacheDumperImpl::WriteHeader() {
+ std::string header_key = "header";
+ std::ostringstream s;
+ s << kTraceMagic << "\t"
+ << "Cache dump format version: " << kCacheDumpMajorVersion << "."
+ << kCacheDumpMinorVersion << "\t"
+ << "RocksDB Version: " << kMajorVersion << "." << kMinorVersion << "\t"
+ << "Format: dump_unit_metadata <sequence_number, dump_unit_checksum, "
+ "dump_unit_size>, dump_unit <timestamp, key, block_type, "
+ "block_size, block_data, block_checksum> cache_value\n";
+ std::string header_value(s.str());
+ CacheDumpUnitType type = CacheDumpUnitType::kHeader;
+ return WriteBlock(type, header_key, header_value);
+}
+
+// Write the footer after all the blocks are stored to indicate the ending.
+IOStatus CacheDumperImpl::WriteFooter() {
+ std::string footer_key = "footer";
+ std::string footer_value("cache dump completed");
+ CacheDumpUnitType type = CacheDumpUnitType::kFooter;
+ return WriteBlock(type, footer_key, footer_value);
+}
+
+// This is the main function to restore the cache entries to secondary cache.
+// First, we check if all the arguments are valid. Then, we read the block
+// sequentially from the reader and insert them to the secondary cache.
+IOStatus CacheDumpedLoaderImpl::RestoreCacheEntriesToSecondaryCache() {
+ // TODO: remove this line when options are used in the loader
+ (void)options_;
+ // Step 1: we check if all the arguments are valid
+ if (secondary_cache_ == nullptr) {
+ return IOStatus::InvalidArgument("Secondary Cache is null");
+ }
+ if (reader_ == nullptr) {
+ return IOStatus::InvalidArgument("CacheDumpReader is null");
+ }
+ // we copy the Cache Deleter Role Map as its member.
+ role_map_ = CopyCacheDeleterRoleMap();
+
+ // Step 2: read the header
+ // TODO: we need to check the cache dump format version and RocksDB version
+ // after the header is read out.
+ IOStatus io_s;
+ DumpUnit dump_unit;
+ std::string data;
+ io_s = ReadHeader(&data, &dump_unit);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ // Step 3: read out the rest of the blocks from the reader. The loop will stop
+ // either I/O status is not ok or we reach to the the end.
+ while (io_s.ok()) {
+ dump_unit.reset();
+ data.clear();
+ // read the content and store in the dump_unit
+ io_s = ReadCacheBlock(&data, &dump_unit);
+ if (!io_s.ok()) {
+ break;
+ }
+ if (dump_unit.type == CacheDumpUnitType::kFooter) {
+ break;
+ }
+ // Create the uncompressed_block based on the information in the dump_unit
+ // (There is no block trailer here compatible with block-based SST file.)
+ Slice content =
+ Slice(static_cast<char*>(dump_unit.value), dump_unit.value_len);
+ Status s = secondary_cache_->InsertSaved(dump_unit.key, content);
+ if (!s.ok()) {
+ io_s = status_to_io_status(std::move(s));
+ }
+ }
+ if (dump_unit.type == CacheDumpUnitType::kFooter) {
+ return IOStatus::OK();
+ } else {
+ return io_s;
+ }
+}
+
+// Read and copy the dump unit metadata to std::string data, decode and create
+// the unit metadata based on the string
+IOStatus CacheDumpedLoaderImpl::ReadDumpUnitMeta(std::string* data,
+ DumpUnitMeta* unit_meta) {
+ assert(reader_ != nullptr);
+ assert(data != nullptr);
+ assert(unit_meta != nullptr);
+ IOStatus io_s = reader_->ReadMetadata(data);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ return status_to_io_status(
+ CacheDumperHelper::DecodeDumpUnitMeta(*data, unit_meta));
+}
+
+// Read and copy the dump unit to std::string data, decode and create the unit
+// based on the string
+IOStatus CacheDumpedLoaderImpl::ReadDumpUnit(size_t len, std::string* data,
+ DumpUnit* unit) {
+ assert(reader_ != nullptr);
+ assert(data != nullptr);
+ assert(unit != nullptr);
+ IOStatus io_s = reader_->ReadPacket(data);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ if (data->size() != len) {
+ return IOStatus::Corruption(
+ "The data being read out does not match the size stored in metadata!");
+ }
+ Slice block;
+ return status_to_io_status(CacheDumperHelper::DecodeDumpUnit(*data, unit));
+}
+
+// Read the header
+IOStatus CacheDumpedLoaderImpl::ReadHeader(std::string* data,
+ DumpUnit* dump_unit) {
+ DumpUnitMeta header_meta;
+ header_meta.reset();
+ std::string meta_string;
+ IOStatus io_s = ReadDumpUnitMeta(&meta_string, &header_meta);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ io_s = ReadDumpUnit(header_meta.dump_unit_size, data, dump_unit);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ uint32_t unit_checksum = crc32c::Value(data->data(), data->size());
+ if (unit_checksum != header_meta.dump_unit_checksum) {
+ return IOStatus::Corruption("Read header unit corrupted!");
+ }
+ return io_s;
+}
+
+// Read the blocks after header is read out
+IOStatus CacheDumpedLoaderImpl::ReadCacheBlock(std::string* data,
+ DumpUnit* dump_unit) {
+ // According to the write process, we read the dump_unit_metadata first
+ DumpUnitMeta unit_meta;
+ unit_meta.reset();
+ std::string unit_string;
+ IOStatus io_s = ReadDumpUnitMeta(&unit_string, &unit_meta);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+
+ // Based on the information in the dump_unit_metadata, we read the dump_unit
+ // and verify if its content is correct.
+ io_s = ReadDumpUnit(unit_meta.dump_unit_size, data, dump_unit);
+ if (!io_s.ok()) {
+ return io_s;
+ }
+ uint32_t unit_checksum = crc32c::Value(data->data(), data->size());
+ if (unit_checksum != unit_meta.dump_unit_checksum) {
+ return IOStatus::Corruption(
+ "Checksum does not match! Read dumped unit corrupted!");
+ }
+ return io_s;
+}
+
+} // namespace ROCKSDB_NAMESPACE
+#endif // ROCKSDB_LITE