diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-04-21 11:54:28 +0000 |
commit | e6918187568dbd01842d8d1d2c808ce16a894239 (patch) | |
tree | 64f88b554b444a49f656b6c656111a145cbbaa28 /src/rocksdb/utilities/blob_db/blob_file.cc | |
parent | Initial commit. (diff) | |
download | ceph-e6918187568dbd01842d8d1d2c808ce16a894239.tar.xz ceph-e6918187568dbd01842d8d1d2c808ce16a894239.zip |
Adding upstream version 18.2.2.upstream/18.2.2
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'src/rocksdb/utilities/blob_db/blob_file.cc')
-rw-r--r-- | src/rocksdb/utilities/blob_db/blob_file.cc | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/src/rocksdb/utilities/blob_db/blob_file.cc b/src/rocksdb/utilities/blob_db/blob_file.cc new file mode 100644 index 000000000..c68e557c6 --- /dev/null +++ b/src/rocksdb/utilities/blob_db/blob_file.cc @@ -0,0 +1,318 @@ + +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +#ifndef ROCKSDB_LITE +#include "utilities/blob_db/blob_file.h" + +#include <stdio.h> + +#include <algorithm> +#include <cinttypes> +#include <memory> + +#include "db/column_family.h" +#include "db/db_impl/db_impl.h" +#include "db/dbformat.h" +#include "file/filename.h" +#include "file/readahead_raf.h" +#include "logging/logging.h" +#include "utilities/blob_db/blob_db_impl.h" + +namespace ROCKSDB_NAMESPACE { + +namespace blob_db { + +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log) + : parent_(p), path_to_dir_(bdir), file_number_(fn), info_log_(info_log) {} + +BlobFile::BlobFile(const BlobDBImpl* p, const std::string& bdir, uint64_t fn, + Logger* info_log, uint32_t column_family_id, + CompressionType compression, bool has_ttl, + const ExpirationRange& expiration_range) + : parent_(p), + path_to_dir_(bdir), + file_number_(fn), + info_log_(info_log), + column_family_id_(column_family_id), + compression_(compression), + has_ttl_(has_ttl), + expiration_range_(expiration_range), + header_(column_family_id, compression, has_ttl, expiration_range), + header_valid_(true) {} + +BlobFile::~BlobFile() { + if (obsolete_) { + std::string pn(PathName()); + Status s = Env::Default()->DeleteFile(PathName()); + if (!s.ok()) { + // ROCKS_LOG_INFO(db_options_.info_log, + // "File could not be deleted %s", pn.c_str()); + } + } +} + +uint32_t BlobFile::GetColumnFamilyId() const { return column_family_id_; } + +std::string BlobFile::PathName() const { + return BlobFileName(path_to_dir_, file_number_); +} + +std::string BlobFile::DumpState() const { + char str[1000]; + snprintf( + str, sizeof(str), + "path: %s fn: %" PRIu64 " blob_count: %" PRIu64 " file_size: %" PRIu64 + " closed: %d obsolete: %d expiration_range: (%" PRIu64 ", %" PRIu64 + "), writer: %d reader: %d", + path_to_dir_.c_str(), file_number_, blob_count_.load(), file_size_.load(), + closed_.load(), obsolete_.load(), expiration_range_.first, + expiration_range_.second, (!!log_writer_), (!!ra_file_reader_)); + return str; +} + +void BlobFile::MarkObsolete(SequenceNumber sequence) { + assert(Immutable()); + obsolete_sequence_ = sequence; + obsolete_.store(true); +} + +Status BlobFile::WriteFooterAndCloseLocked(SequenceNumber sequence) { + BlobLogFooter footer; + footer.blob_count = blob_count_; + if (HasTTL()) { + footer.expiration_range = expiration_range_; + } + + // this will close the file and reset the Writable File Pointer. + Status s = log_writer_->AppendFooter(footer, /* checksum_method */ nullptr, + /* checksum_value */ nullptr); + if (s.ok()) { + closed_ = true; + immutable_sequence_ = sequence; + file_size_ += BlobLogFooter::kSize; + } + // delete the sequential writer + log_writer_.reset(); + return s; +} + +Status BlobFile::ReadFooter(BlobLogFooter* bf) { + if (file_size_ < (BlobLogHeader::kSize + BlobLogFooter::kSize)) { + return Status::IOError("File does not have footer", PathName()); + } + + uint64_t footer_offset = file_size_ - BlobLogFooter::kSize; + // assume that ra_file_reader_ is valid before we enter this + assert(ra_file_reader_); + + Slice result; + std::string buf; + AlignedBuf aligned_buf; + Status s; + // TODO: rate limit reading footers from blob files. + if (ra_file_reader_->use_direct_io()) { + s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, + &result, nullptr, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); + } else { + buf.reserve(BlobLogFooter::kSize + 10); + s = ra_file_reader_->Read(IOOptions(), footer_offset, BlobLogFooter::kSize, + &result, &buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); + } + if (!s.ok()) return s; + if (result.size() != BlobLogFooter::kSize) { + // should not happen + return Status::IOError("EOF reached before footer"); + } + + s = bf->DecodeFrom(result); + return s; +} + +Status BlobFile::SetFromFooterLocked(const BlobLogFooter& footer) { + blob_count_ = footer.blob_count; + expiration_range_ = footer.expiration_range; + closed_ = true; + return Status::OK(); +} + +Status BlobFile::Fsync() { + Status s; + if (log_writer_.get()) { + s = log_writer_->Sync(); + } + return s; +} + +void BlobFile::CloseRandomAccessLocked() { + ra_file_reader_.reset(); + last_access_ = -1; +} + +Status BlobFile::GetReader(Env* env, const FileOptions& file_options, + std::shared_ptr<RandomAccessFileReader>* reader, + bool* fresh_open) { + assert(reader != nullptr); + assert(fresh_open != nullptr); + *fresh_open = false; + int64_t current_time = 0; + if (env->GetCurrentTime(¤t_time).ok()) { + last_access_.store(current_time); + } + Status s; + + { + ReadLock lockbfile_r(&mutex_); + if (ra_file_reader_) { + *reader = ra_file_reader_; + return s; + } + } + + WriteLock lockbfile_w(&mutex_); + // Double check. + if (ra_file_reader_) { + *reader = ra_file_reader_; + return s; + } + + std::unique_ptr<FSRandomAccessFile> rfile; + s = env->GetFileSystem()->NewRandomAccessFile(PathName(), file_options, + &rfile, nullptr); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file for random-read: %s status: '%s'" + " exists: '%s'", + PathName().c_str(), s.ToString().c_str(), + env->FileExists(PathName()).ToString().c_str()); + return s; + } + + ra_file_reader_ = + std::make_shared<RandomAccessFileReader>(std::move(rfile), PathName()); + *reader = ra_file_reader_; + *fresh_open = true; + return s; +} + +Status BlobFile::ReadMetadata(const std::shared_ptr<FileSystem>& fs, + const FileOptions& file_options) { + assert(Immutable()); + // Get file size. + uint64_t file_size = 0; + Status s = + fs->GetFileSize(PathName(), file_options.io_options, &file_size, nullptr); + if (s.ok()) { + file_size_ = file_size; + } else { + ROCKS_LOG_ERROR(info_log_, + "Failed to get size of blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + if (file_size < BlobLogHeader::kSize) { + ROCKS_LOG_ERROR( + info_log_, "Incomplete blob file blob file %" PRIu64 ", size: %" PRIu64, + file_number_, file_size); + return Status::Corruption("Incomplete blob file header."); + } + + // Create file reader. + std::unique_ptr<RandomAccessFileReader> file_reader; + s = RandomAccessFileReader::Create(fs, PathName(), file_options, &file_reader, + nullptr); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to open blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + + // Read file header. + std::string header_buf; + AlignedBuf aligned_buf; + Slice header_slice; + // TODO: rate limit reading headers from blob files. + if (file_reader->use_direct_io()) { + s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, + nullptr, &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); + } else { + header_buf.reserve(BlobLogHeader::kSize); + s = file_reader->Read(IOOptions(), 0, BlobLogHeader::kSize, &header_slice, + &header_buf[0], nullptr, + Env::IO_TOTAL /* rate_limiter_priority */); + } + if (!s.ok()) { + ROCKS_LOG_ERROR( + info_log_, "Failed to read header of blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogHeader header; + s = header.DecodeFrom(header_slice); + if (!s.ok()) { + ROCKS_LOG_ERROR(info_log_, + "Failed to decode header of blob file %" PRIu64 + ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + column_family_id_ = header.column_family_id; + compression_ = header.compression; + has_ttl_ = header.has_ttl; + if (has_ttl_) { + expiration_range_ = header.expiration_range; + } + header_valid_ = true; + + // Read file footer. + if (file_size_ < BlobLogHeader::kSize + BlobLogFooter::kSize) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + std::string footer_buf; + Slice footer_slice; + // TODO: rate limit reading footers from blob files. + if (file_reader->use_direct_io()) { + s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, + BlobLogFooter::kSize, &footer_slice, nullptr, + &aligned_buf, + Env::IO_TOTAL /* rate_limiter_priority */); + } else { + footer_buf.reserve(BlobLogFooter::kSize); + s = file_reader->Read(IOOptions(), file_size - BlobLogFooter::kSize, + BlobLogFooter::kSize, &footer_slice, &footer_buf[0], + nullptr, Env::IO_TOTAL /* rate_limiter_priority */); + } + if (!s.ok()) { + ROCKS_LOG_ERROR( + info_log_, "Failed to read footer of blob file %" PRIu64 ", status: %s", + file_number_, s.ToString().c_str()); + return s; + } + BlobLogFooter footer; + s = footer.DecodeFrom(footer_slice); + if (!s.ok()) { + // OK not to have footer. + assert(!footer_valid_); + return Status::OK(); + } + blob_count_ = footer.blob_count; + if (has_ttl_) { + assert(header.expiration_range.first <= footer.expiration_range.first); + assert(header.expiration_range.second >= footer.expiration_range.second); + expiration_range_ = footer.expiration_range; + } + footer_valid_ = true; + return Status::OK(); +} + +} // namespace blob_db +} // namespace ROCKSDB_NAMESPACE +#endif // ROCKSDB_LITE |