From 483eb2f56657e8e7f419ab1a4fab8dce9ade8609 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 27 Apr 2024 20:24:20 +0200 Subject: Adding upstream version 14.2.21. Signed-off-by: Daniel Baumann --- src/rocksdb/env/env.cc | 425 +++++++++ src/rocksdb/env/env_basic_test.cc | 354 ++++++++ src/rocksdb/env/env_chroot.cc | 321 +++++++ src/rocksdb/env/env_chroot.h | 22 + src/rocksdb/env/env_encryption.cc | 927 +++++++++++++++++++ src/rocksdb/env/env_hdfs.cc | 627 +++++++++++++ src/rocksdb/env/env_posix.cc | 1128 +++++++++++++++++++++++ src/rocksdb/env/env_test.cc | 1774 +++++++++++++++++++++++++++++++++++++ src/rocksdb/env/io_posix.cc | 1082 ++++++++++++++++++++++ src/rocksdb/env/io_posix.h | 258 ++++++ src/rocksdb/env/mock_env.cc | 775 ++++++++++++++++ src/rocksdb/env/mock_env.h | 114 +++ src/rocksdb/env/mock_env_test.cc | 83 ++ src/rocksdb/env/posix_logger.h | 185 ++++ 14 files changed, 8075 insertions(+) create mode 100644 src/rocksdb/env/env.cc create mode 100644 src/rocksdb/env/env_basic_test.cc create mode 100644 src/rocksdb/env/env_chroot.cc create mode 100644 src/rocksdb/env/env_chroot.h create mode 100644 src/rocksdb/env/env_encryption.cc create mode 100644 src/rocksdb/env/env_hdfs.cc create mode 100644 src/rocksdb/env/env_posix.cc create mode 100644 src/rocksdb/env/env_test.cc create mode 100644 src/rocksdb/env/io_posix.cc create mode 100644 src/rocksdb/env/io_posix.h create mode 100644 src/rocksdb/env/mock_env.cc create mode 100644 src/rocksdb/env/mock_env.h create mode 100644 src/rocksdb/env/mock_env_test.cc create mode 100644 src/rocksdb/env/posix_logger.h (limited to 'src/rocksdb/env') diff --git a/src/rocksdb/env/env.cc b/src/rocksdb/env/env.cc new file mode 100644 index 00000000..fde03577 --- /dev/null +++ b/src/rocksdb/env/env.cc @@ -0,0 +1,425 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "rocksdb/env.h" + +#include +#include "options/db_options.h" +#include "port/port.h" +#include "port/sys_time.h" +#include "rocksdb/options.h" +#include "util/arena.h" +#include "util/autovector.h" + +namespace rocksdb { + +Env::~Env() { +} + +std::string Env::PriorityToString(Env::Priority priority) { + switch (priority) { + case Env::Priority::BOTTOM: + return "Bottom"; + case Env::Priority::LOW: + return "Low"; + case Env::Priority::HIGH: + return "High"; + case Env::Priority::USER: + return "User"; + case Env::Priority::TOTAL: + assert(false); + } + return "Invalid"; +} + +uint64_t Env::GetThreadID() const { + std::hash hasher; + return hasher(std::this_thread::get_id()); +} + +Status Env::ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) { + Status s = RenameFile(old_fname, fname); + if (!s.ok()) { + return s; + } + return NewWritableFile(fname, result, options); +} + +Status Env::GetChildrenFileAttributes(const std::string& dir, + std::vector* result) { + assert(result != nullptr); + std::vector child_fnames; + Status s = GetChildren(dir, &child_fnames); + if (!s.ok()) { + return s; + } + result->resize(child_fnames.size()); + size_t result_size = 0; + for (size_t i = 0; i < child_fnames.size(); ++i) { + const std::string path = dir + "/" + child_fnames[i]; + if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) { + if (FileExists(path).IsNotFound()) { + // The file may have been deleted since we listed the directory + continue; + } + return s; + } + (*result)[result_size].name = std::move(child_fnames[i]); + result_size++; + } + result->resize(result_size); + return Status::OK(); +} + +SequentialFile::~SequentialFile() { +} + +RandomAccessFile::~RandomAccessFile() { +} + +WritableFile::~WritableFile() { +} + +MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {} + +Logger::~Logger() {} + +Status Logger::Close() { + if (!closed_) { + closed_ = true; + return CloseImpl(); + } else { + return Status::OK(); + } +} + +Status Logger::CloseImpl() { return Status::NotSupported(); } + +FileLock::~FileLock() { +} + +void LogFlush(Logger *info_log) { + if (info_log) { + info_log->Flush(); + } +} + +static void Logv(Logger *info_log, const char* format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) { + info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap); + } +} + +void Log(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Logv(info_log, format, ap); + va_end(ap); +} + +void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) { + static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN", + "ERROR", "FATAL" }; + if (log_level < log_level_) { + return; + } + + if (log_level == InfoLogLevel::INFO_LEVEL) { + // Doesn't print log level if it is INFO level. + // This is to avoid unexpected performance regression after we add + // the feature of log level. All the logs before we add the feature + // are INFO level. We don't want to add extra costs to those existing + // logging. + Logv(format, ap); + } else if (log_level == InfoLogLevel::HEADER_LEVEL) { + LogHeader(format, ap); + } else { + char new_format[500]; + snprintf(new_format, sizeof(new_format) - 1, "[%s] %s", + kInfoLogLevelNames[log_level], format); + Logv(new_format, ap); + } +} + +static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= log_level) { + if (log_level == InfoLogLevel::HEADER_LEVEL) { + info_log->LogHeader(format, ap); + } else { + info_log->Logv(log_level, format, ap); + } + } +} + +void Log(const InfoLogLevel log_level, Logger* info_log, const char* format, + ...) { + va_list ap; + va_start(ap, format); + Logv(log_level, info_log, format, ap); + va_end(ap); +} + +static void Headerv(Logger *info_log, const char *format, va_list ap) { + if (info_log) { + info_log->LogHeader(format, ap); + } +} + +void Header(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Headerv(info_log, format, ap); + va_end(ap); +} + +static void Debugv(Logger* info_log, const char* format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) { + info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap); + } +} + +void Debug(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Debugv(info_log, format, ap); + va_end(ap); +} + +static void Infov(Logger* info_log, const char* format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) { + info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap); + } +} + +void Info(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Infov(info_log, format, ap); + va_end(ap); +} + +static void Warnv(Logger* info_log, const char* format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) { + info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap); + } +} + +void Warn(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Warnv(info_log, format, ap); + va_end(ap); +} + +static void Errorv(Logger* info_log, const char* format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) { + info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap); + } +} + +void Error(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Errorv(info_log, format, ap); + va_end(ap); +} + +static void Fatalv(Logger* info_log, const char* format, va_list ap) { + if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) { + info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap); + } +} + +void Fatal(Logger* info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Fatalv(info_log, format, ap); + va_end(ap); +} + +void LogFlush(const std::shared_ptr& info_log) { + LogFlush(info_log.get()); +} + +void Log(const InfoLogLevel log_level, const std::shared_ptr& info_log, + const char* format, ...) { + va_list ap; + va_start(ap, format); + Logv(log_level, info_log.get(), format, ap); + va_end(ap); +} + +void Header(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Headerv(info_log.get(), format, ap); + va_end(ap); +} + +void Debug(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Debugv(info_log.get(), format, ap); + va_end(ap); +} + +void Info(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Infov(info_log.get(), format, ap); + va_end(ap); +} + +void Warn(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Warnv(info_log.get(), format, ap); + va_end(ap); +} + +void Error(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Errorv(info_log.get(), format, ap); + va_end(ap); +} + +void Fatal(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Fatalv(info_log.get(), format, ap); + va_end(ap); +} + +void Log(const std::shared_ptr& info_log, const char* format, ...) { + va_list ap; + va_start(ap, format); + Logv(info_log.get(), format, ap); + va_end(ap); +} + +Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname, + bool should_sync) { + std::unique_ptr file; + EnvOptions soptions; + Status s = env->NewWritableFile(fname, &file, soptions); + if (!s.ok()) { + return s; + } + s = file->Append(data); + if (s.ok() && should_sync) { + s = file->Sync(); + } + if (!s.ok()) { + env->DeleteFile(fname); + } + return s; +} + +Status ReadFileToString(Env* env, const std::string& fname, std::string* data) { + EnvOptions soptions; + data->clear(); + std::unique_ptr file; + Status s = env->NewSequentialFile(fname, &file, soptions); + if (!s.ok()) { + return s; + } + static const int kBufferSize = 8192; + char* space = new char[kBufferSize]; + while (true) { + Slice fragment; + s = file->Read(kBufferSize, &fragment, space); + if (!s.ok()) { + break; + } + data->append(fragment.data(), fragment.size()); + if (fragment.empty()) { + break; + } + } + delete[] space; + return s; +} + +EnvWrapper::~EnvWrapper() { +} + +namespace { // anonymous namespace + +void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { + env_options->use_mmap_reads = options.allow_mmap_reads; + env_options->use_mmap_writes = options.allow_mmap_writes; + env_options->use_direct_reads = options.use_direct_reads; + env_options->set_fd_cloexec = options.is_fd_close_on_exec; + env_options->bytes_per_sync = options.bytes_per_sync; + env_options->compaction_readahead_size = options.compaction_readahead_size; + env_options->random_access_max_buffer_size = + options.random_access_max_buffer_size; + env_options->rate_limiter = options.rate_limiter.get(); + env_options->writable_file_max_buffer_size = + options.writable_file_max_buffer_size; + env_options->allow_fallocate = options.allow_fallocate; +} + +} + +EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync; + optimized_env_options.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; + return optimized_env_options; +} + +EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const { + return env_options; +} + +EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_reads = false; + return optimized_env_options; +} + +EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_reads = false; + return optimized_env_options; +} + +EnvOptions Env::OptimizeForCompactionTableWrite( + const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_writes = + db_options.use_direct_io_for_flush_and_compaction; + return optimized_env_options; +} + +EnvOptions Env::OptimizeForCompactionTableRead( + const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_reads = db_options.use_direct_reads; + return optimized_env_options; +} + +EnvOptions::EnvOptions(const DBOptions& options) { + AssignEnvOptions(this, options); +} + +EnvOptions::EnvOptions() { + DBOptions options; + AssignEnvOptions(this, options); +} + + +} // namespace rocksdb diff --git a/src/rocksdb/env/env_basic_test.cc b/src/rocksdb/env/env_basic_test.cc new file mode 100644 index 00000000..3efae758a --- /dev/null +++ b/src/rocksdb/env/env_basic_test.cc @@ -0,0 +1,354 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include +#include +#include +#include + +#include "env/mock_env.h" +#include "rocksdb/env.h" +#include "rocksdb/utilities/object_registry.h" +#include "util/testharness.h" + +namespace rocksdb { + +// Normalizes trivial differences across Envs such that these test cases can +// run on all Envs. +class NormalizingEnvWrapper : public EnvWrapper { + public: + explicit NormalizingEnvWrapper(Env* base) : EnvWrapper(base) {} + + // Removes . and .. from directory listing + Status GetChildren(const std::string& dir, + std::vector* result) override { + Status status = EnvWrapper::GetChildren(dir, result); + if (status.ok()) { + result->erase(std::remove_if(result->begin(), result->end(), + [](const std::string& s) { + return s == "." || s == ".."; + }), + result->end()); + } + return status; + } + + // Removes . and .. from directory listing + Status GetChildrenFileAttributes( + const std::string& dir, std::vector* result) override { + Status status = EnvWrapper::GetChildrenFileAttributes(dir, result); + if (status.ok()) { + result->erase(std::remove_if(result->begin(), result->end(), + [](const FileAttributes& fa) { + return fa.name == "." || fa.name == ".."; + }), + result->end()); + } + return status; + } +}; + +class EnvBasicTestWithParam : public testing::Test, + public ::testing::WithParamInterface { + public: + Env* env_; + const EnvOptions soptions_; + std::string test_dir_; + + EnvBasicTestWithParam() : env_(GetParam()) { + test_dir_ = test::PerThreadDBPath(env_, "env_basic_test"); + } + + void SetUp() override { env_->CreateDirIfMissing(test_dir_); } + + void TearDown() override { + std::vector files; + env_->GetChildren(test_dir_, &files); + for (const auto& file : files) { + // don't know whether it's file or directory, try both. The tests must + // only create files or empty directories, so one must succeed, else the + // directory's corrupted. + Status s = env_->DeleteFile(test_dir_ + "/" + file); + if (!s.ok()) { + ASSERT_OK(env_->DeleteDir(test_dir_ + "/" + file)); + } + } + } +}; + +class EnvMoreTestWithParam : public EnvBasicTestWithParam {}; + +static std::unique_ptr def_env(new NormalizingEnvWrapper(Env::Default())); +INSTANTIATE_TEST_CASE_P(EnvDefault, EnvBasicTestWithParam, + ::testing::Values(def_env.get())); +INSTANTIATE_TEST_CASE_P(EnvDefault, EnvMoreTestWithParam, + ::testing::Values(def_env.get())); + +static std::unique_ptr mock_env(new MockEnv(Env::Default())); +INSTANTIATE_TEST_CASE_P(MockEnv, EnvBasicTestWithParam, + ::testing::Values(mock_env.get())); +#ifndef ROCKSDB_LITE +static std::unique_ptr mem_env(NewMemEnv(Env::Default())); +INSTANTIATE_TEST_CASE_P(MemEnv, EnvBasicTestWithParam, + ::testing::Values(mem_env.get())); + +namespace { + +// Returns a vector of 0 or 1 Env*, depending whether an Env is registered for +// TEST_ENV_URI. +// +// The purpose of returning an empty vector (instead of nullptr) is that gtest +// ValuesIn() will skip running tests when given an empty collection. +std::vector GetCustomEnvs() { + static Env* custom_env; + static std::unique_ptr custom_env_guard; + static bool init = false; + if (!init) { + init = true; + const char* uri = getenv("TEST_ENV_URI"); + if (uri != nullptr) { + custom_env = NewCustomObject(uri, &custom_env_guard); + } + } + + std::vector res; + if (custom_env != nullptr) { + res.emplace_back(custom_env); + } + return res; +} + +} // anonymous namespace + +INSTANTIATE_TEST_CASE_P(CustomEnv, EnvBasicTestWithParam, + ::testing::ValuesIn(GetCustomEnvs())); + +INSTANTIATE_TEST_CASE_P(CustomEnv, EnvMoreTestWithParam, + ::testing::ValuesIn(GetCustomEnvs())); + +#endif // ROCKSDB_LITE + +TEST_P(EnvBasicTestWithParam, Basics) { + uint64_t file_size; + std::unique_ptr writable_file; + std::vector children; + + // Check that the directory is empty. + ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/non_existent")); + ASSERT_TRUE(!env_->GetFileSize(test_dir_ + "/non_existent", &file_size).ok()); + ASSERT_OK(env_->GetChildren(test_dir_, &children)); + ASSERT_EQ(0U, children.size()); + + // Create a file. + ASSERT_OK(env_->NewWritableFile(test_dir_ + "/f", &writable_file, soptions_)); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); + + // Check that the file exists. + ASSERT_OK(env_->FileExists(test_dir_ + "/f")); + ASSERT_OK(env_->GetFileSize(test_dir_ + "/f", &file_size)); + ASSERT_EQ(0U, file_size); + ASSERT_OK(env_->GetChildren(test_dir_, &children)); + ASSERT_EQ(1U, children.size()); + ASSERT_EQ("f", children[0]); + ASSERT_OK(env_->DeleteFile(test_dir_ + "/f")); + + // Write to the file. + ASSERT_OK( + env_->NewWritableFile(test_dir_ + "/f1", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("abc")); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); + ASSERT_OK( + env_->NewWritableFile(test_dir_ + "/f2", &writable_file, soptions_)); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); + + // Check for expected size. + ASSERT_OK(env_->GetFileSize(test_dir_ + "/f1", &file_size)); + ASSERT_EQ(3U, file_size); + + // Check that renaming works. + ASSERT_TRUE( + !env_->RenameFile(test_dir_ + "/non_existent", test_dir_ + "/g").ok()); + ASSERT_OK(env_->RenameFile(test_dir_ + "/f1", test_dir_ + "/g")); + ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/f1")); + ASSERT_OK(env_->FileExists(test_dir_ + "/g")); + ASSERT_OK(env_->GetFileSize(test_dir_ + "/g", &file_size)); + ASSERT_EQ(3U, file_size); + + // Check that renaming overwriting works + ASSERT_OK(env_->RenameFile(test_dir_ + "/f2", test_dir_ + "/g")); + ASSERT_OK(env_->GetFileSize(test_dir_ + "/g", &file_size)); + ASSERT_EQ(0U, file_size); + + // Check that opening non-existent file fails. + std::unique_ptr seq_file; + std::unique_ptr rand_file; + ASSERT_TRUE(!env_->NewSequentialFile(test_dir_ + "/non_existent", &seq_file, + soptions_) + .ok()); + ASSERT_TRUE(!seq_file); + ASSERT_TRUE(!env_->NewRandomAccessFile(test_dir_ + "/non_existent", + &rand_file, soptions_) + .ok()); + ASSERT_TRUE(!rand_file); + + // Check that deleting works. + ASSERT_TRUE(!env_->DeleteFile(test_dir_ + "/non_existent").ok()); + ASSERT_OK(env_->DeleteFile(test_dir_ + "/g")); + ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/g")); + ASSERT_OK(env_->GetChildren(test_dir_, &children)); + ASSERT_EQ(0U, children.size()); + ASSERT_TRUE( + env_->GetChildren(test_dir_ + "/non_existent", &children).IsNotFound()); +} + +TEST_P(EnvBasicTestWithParam, ReadWrite) { + std::unique_ptr writable_file; + std::unique_ptr seq_file; + std::unique_ptr rand_file; + Slice result; + char scratch[100]; + + ASSERT_OK(env_->NewWritableFile(test_dir_ + "/f", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("hello ")); + ASSERT_OK(writable_file->Append("world")); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); + + // Read sequentially. + ASSERT_OK(env_->NewSequentialFile(test_dir_ + "/f", &seq_file, soptions_)); + ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello". + ASSERT_EQ(0, result.compare("hello")); + ASSERT_OK(seq_file->Skip(1)); + ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Read "world". + ASSERT_EQ(0, result.compare("world")); + ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Try reading past EOF. + ASSERT_EQ(0U, result.size()); + ASSERT_OK(seq_file->Skip(100)); // Try to skip past end of file. + ASSERT_OK(seq_file->Read(1000, &result, scratch)); + ASSERT_EQ(0U, result.size()); + + // Random reads. + ASSERT_OK(env_->NewRandomAccessFile(test_dir_ + "/f", &rand_file, soptions_)); + ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world". + ASSERT_EQ(0, result.compare("world")); + ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello". + ASSERT_EQ(0, result.compare("hello")); + ASSERT_OK(rand_file->Read(10, 100, &result, scratch)); // Read "d". + ASSERT_EQ(0, result.compare("d")); + + // Too high offset. + ASSERT_TRUE(rand_file->Read(1000, 5, &result, scratch).ok()); +} + +TEST_P(EnvBasicTestWithParam, Misc) { + std::unique_ptr writable_file; + ASSERT_OK(env_->NewWritableFile(test_dir_ + "/b", &writable_file, soptions_)); + + // These are no-ops, but we test they return success. + ASSERT_OK(writable_file->Sync()); + ASSERT_OK(writable_file->Flush()); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); +} + +TEST_P(EnvBasicTestWithParam, LargeWrite) { + const size_t kWriteSize = 300 * 1024; + char* scratch = new char[kWriteSize * 2]; + + std::string write_data; + for (size_t i = 0; i < kWriteSize; ++i) { + write_data.append(1, static_cast(i)); + } + + std::unique_ptr writable_file; + ASSERT_OK(env_->NewWritableFile(test_dir_ + "/f", &writable_file, soptions_)); + ASSERT_OK(writable_file->Append("foo")); + ASSERT_OK(writable_file->Append(write_data)); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); + + std::unique_ptr seq_file; + Slice result; + ASSERT_OK(env_->NewSequentialFile(test_dir_ + "/f", &seq_file, soptions_)); + ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo". + ASSERT_EQ(0, result.compare("foo")); + + size_t read = 0; + std::string read_data; + while (read < kWriteSize) { + ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch)); + read_data.append(result.data(), result.size()); + read += result.size(); + } + ASSERT_TRUE(write_data == read_data); + delete [] scratch; +} + +TEST_P(EnvMoreTestWithParam, GetModTime) { + ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/dir1")); + uint64_t mtime1 = 0x0; + ASSERT_OK(env_->GetFileModificationTime(test_dir_ + "/dir1", &mtime1)); +} + +TEST_P(EnvMoreTestWithParam, MakeDir) { + ASSERT_OK(env_->CreateDir(test_dir_ + "/j")); + ASSERT_OK(env_->FileExists(test_dir_ + "/j")); + std::vector children; + env_->GetChildren(test_dir_, &children); + ASSERT_EQ(1U, children.size()); + // fail because file already exists + ASSERT_TRUE(!env_->CreateDir(test_dir_ + "/j").ok()); + ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/j")); + ASSERT_OK(env_->DeleteDir(test_dir_ + "/j")); + ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/j")); +} + +TEST_P(EnvMoreTestWithParam, GetChildren) { + // empty folder returns empty vector + std::vector children; + std::vector childAttr; + ASSERT_OK(env_->CreateDirIfMissing(test_dir_)); + ASSERT_OK(env_->GetChildren(test_dir_, &children)); + ASSERT_OK(env_->FileExists(test_dir_)); + ASSERT_OK(env_->GetChildrenFileAttributes(test_dir_, &childAttr)); + ASSERT_EQ(0U, children.size()); + ASSERT_EQ(0U, childAttr.size()); + + // folder with contents returns relative path to test dir + ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/niu")); + ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/you")); + ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/guo")); + ASSERT_OK(env_->GetChildren(test_dir_, &children)); + ASSERT_OK(env_->GetChildrenFileAttributes(test_dir_, &childAttr)); + ASSERT_EQ(3U, children.size()); + ASSERT_EQ(3U, childAttr.size()); + for (auto each : children) { + env_->DeleteDir(test_dir_ + "/" + each); + } // necessary for default POSIX env + + // non-exist directory returns IOError + ASSERT_OK(env_->DeleteDir(test_dir_)); + ASSERT_TRUE(!env_->FileExists(test_dir_).ok()); + ASSERT_TRUE(!env_->GetChildren(test_dir_, &children).ok()); + ASSERT_TRUE(!env_->GetChildrenFileAttributes(test_dir_, &childAttr).ok()); + + // if dir is a file, returns IOError + ASSERT_OK(env_->CreateDir(test_dir_)); + std::unique_ptr writable_file; + ASSERT_OK( + env_->NewWritableFile(test_dir_ + "/file", &writable_file, soptions_)); + ASSERT_OK(writable_file->Close()); + writable_file.reset(); + ASSERT_TRUE(!env_->GetChildren(test_dir_ + "/file", &children).ok()); + ASSERT_EQ(0U, children.size()); +} + +} // namespace rocksdb +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/env/env_chroot.cc b/src/rocksdb/env/env_chroot.cc new file mode 100644 index 00000000..8a7fb449 --- /dev/null +++ b/src/rocksdb/env/env_chroot.cc @@ -0,0 +1,321 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) + +#include "env/env_chroot.h" + +#include +#include +#include +#include + +#include +#include +#include + +#include "rocksdb/status.h" + +namespace rocksdb { + +class ChrootEnv : public EnvWrapper { + public: + ChrootEnv(Env* base_env, const std::string& chroot_dir) + : EnvWrapper(base_env) { +#if defined(OS_AIX) + char resolvedName[PATH_MAX]; + char* real_chroot_dir = realpath(chroot_dir.c_str(), resolvedName); +#else + char* real_chroot_dir = realpath(chroot_dir.c_str(), nullptr); +#endif + // chroot_dir must exist so realpath() returns non-nullptr. + assert(real_chroot_dir != nullptr); + chroot_dir_ = real_chroot_dir; +#if !defined(OS_AIX) + free(real_chroot_dir); +#endif + } + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewSequentialFile(status_and_enc_path.second, result, + options); + } + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewRandomAccessFile(status_and_enc_path.second, result, + options); + } + + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewWritableFile(status_and_enc_path.second, result, + options); + } + + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + auto status_and_old_enc_path = EncodePath(old_fname); + if (!status_and_old_enc_path.first.ok()) { + return status_and_old_enc_path.first; + } + return EnvWrapper::ReuseWritableFile(status_and_old_enc_path.second, + status_and_old_enc_path.second, result, + options); + } + + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewRandomRWFile(status_and_enc_path.second, result, + options); + } + + Status NewDirectory(const std::string& dir, + std::unique_ptr* result) override { + auto status_and_enc_path = EncodePathWithNewBasename(dir); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewDirectory(status_and_enc_path.second, result); + } + + Status FileExists(const std::string& fname) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::FileExists(status_and_enc_path.second); + } + + Status GetChildren(const std::string& dir, + std::vector* result) override { + auto status_and_enc_path = EncodePath(dir); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::GetChildren(status_and_enc_path.second, result); + } + + Status GetChildrenFileAttributes( + const std::string& dir, std::vector* result) override { + auto status_and_enc_path = EncodePath(dir); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::GetChildrenFileAttributes(status_and_enc_path.second, + result); + } + + Status DeleteFile(const std::string& fname) override { + auto status_and_enc_path = EncodePath(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::DeleteFile(status_and_enc_path.second); + } + + Status CreateDir(const std::string& dirname) override { + auto status_and_enc_path = EncodePathWithNewBasename(dirname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::CreateDir(status_and_enc_path.second); + } + + Status CreateDirIfMissing(const std::string& dirname) override { + auto status_and_enc_path = EncodePathWithNewBasename(dirname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::CreateDirIfMissing(status_and_enc_path.second); + } + + Status DeleteDir(const std::string& dirname) override { + auto status_and_enc_path = EncodePath(dirname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::DeleteDir(status_and_enc_path.second); + } + + Status GetFileSize(const std::string& fname, uint64_t* file_size) override { + auto status_and_enc_path = EncodePath(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::GetFileSize(status_and_enc_path.second, file_size); + } + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override { + auto status_and_enc_path = EncodePath(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::GetFileModificationTime(status_and_enc_path.second, + file_mtime); + } + + Status RenameFile(const std::string& src, const std::string& dest) override { + auto status_and_src_enc_path = EncodePath(src); + if (!status_and_src_enc_path.first.ok()) { + return status_and_src_enc_path.first; + } + auto status_and_dest_enc_path = EncodePathWithNewBasename(dest); + if (!status_and_dest_enc_path.first.ok()) { + return status_and_dest_enc_path.first; + } + return EnvWrapper::RenameFile(status_and_src_enc_path.second, + status_and_dest_enc_path.second); + } + + Status LinkFile(const std::string& src, const std::string& dest) override { + auto status_and_src_enc_path = EncodePath(src); + if (!status_and_src_enc_path.first.ok()) { + return status_and_src_enc_path.first; + } + auto status_and_dest_enc_path = EncodePathWithNewBasename(dest); + if (!status_and_dest_enc_path.first.ok()) { + return status_and_dest_enc_path.first; + } + return EnvWrapper::LinkFile(status_and_src_enc_path.second, + status_and_dest_enc_path.second); + } + + Status LockFile(const std::string& fname, FileLock** lock) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + // FileLock subclasses may store path (e.g., PosixFileLock stores it). We + // can skip stripping the chroot directory from this path because callers + // shouldn't use it. + return EnvWrapper::LockFile(status_and_enc_path.second, lock); + } + + Status GetTestDirectory(std::string* path) override { + // Adapted from PosixEnv's implementation since it doesn't provide a way to + // create directory in the chroot. + char buf[256]; + snprintf(buf, sizeof(buf), "/rocksdbtest-%d", static_cast(geteuid())); + *path = buf; + + // Directory may already exist, so ignore return + CreateDir(*path); + return Status::OK(); + } + + Status NewLogger(const std::string& fname, + std::shared_ptr* result) override { + auto status_and_enc_path = EncodePathWithNewBasename(fname); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::NewLogger(status_and_enc_path.second, result); + } + + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override { + auto status_and_enc_path = EncodePath(db_path); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + return EnvWrapper::GetAbsolutePath(status_and_enc_path.second, output_path); + } + + private: + // Returns status and expanded absolute path including the chroot directory. + // Checks whether the provided path breaks out of the chroot. If it returns + // non-OK status, the returned path should not be used. + std::pair EncodePath(const std::string& path) { + if (path.empty() || path[0] != '/') { + return {Status::InvalidArgument(path, "Not an absolute path"), ""}; + } + std::pair res; + res.second = chroot_dir_ + path; +#if defined(OS_AIX) + char resolvedName[PATH_MAX]; + char* normalized_path = realpath(res.second.c_str(), resolvedName); +#else + char* normalized_path = realpath(res.second.c_str(), nullptr); +#endif + if (normalized_path == nullptr) { + res.first = Status::NotFound(res.second, strerror(errno)); + } else if (strlen(normalized_path) < chroot_dir_.size() || + strncmp(normalized_path, chroot_dir_.c_str(), + chroot_dir_.size()) != 0) { + res.first = Status::IOError(res.second, + "Attempted to access path outside chroot"); + } else { + res.first = Status::OK(); + } +#if !defined(OS_AIX) + free(normalized_path); +#endif + return res; + } + + // Similar to EncodePath() except assumes the basename in the path hasn't been + // created yet. + std::pair EncodePathWithNewBasename( + const std::string& path) { + if (path.empty() || path[0] != '/') { + return {Status::InvalidArgument(path, "Not an absolute path"), ""}; + } + // Basename may be followed by trailing slashes + size_t final_idx = path.find_last_not_of('/'); + if (final_idx == std::string::npos) { + // It's only slashes so no basename to extract + return EncodePath(path); + } + + // Pull off the basename temporarily since realname(3) (used by + // EncodePath()) requires a path that exists + size_t base_sep = path.rfind('/', final_idx); + auto status_and_enc_path = EncodePath(path.substr(0, base_sep + 1)); + status_and_enc_path.second.append(path.substr(base_sep + 1)); + return status_and_enc_path; + } + + std::string chroot_dir_; +}; + +Env* NewChrootEnv(Env* base_env, const std::string& chroot_dir) { + if (!base_env->FileExists(chroot_dir).ok()) { + return nullptr; + } + return new ChrootEnv(base_env, chroot_dir); +} + +} // namespace rocksdb + +#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) diff --git a/src/rocksdb/env/env_chroot.h b/src/rocksdb/env/env_chroot.h new file mode 100644 index 00000000..b2760bc0 --- /dev/null +++ b/src/rocksdb/env/env_chroot.h @@ -0,0 +1,22 @@ +// Copyright (c) 2016-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) + +#include + +#include "rocksdb/env.h" + +namespace rocksdb { + +// Returns an Env that translates paths such that the root directory appears to +// be chroot_dir. chroot_dir should refer to an existing directory. +Env* NewChrootEnv(Env* base_env, const std::string& chroot_dir); + +} // namespace rocksdb + +#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) diff --git a/src/rocksdb/env/env_encryption.cc b/src/rocksdb/env/env_encryption.cc new file mode 100644 index 00000000..aa59e663 --- /dev/null +++ b/src/rocksdb/env/env_encryption.cc @@ -0,0 +1,927 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include +#include +#include +#include + +#include "rocksdb/env_encryption.h" +#include "util/aligned_buffer.h" +#include "util/coding.h" +#include "util/random.h" + +#endif + +namespace rocksdb { + +#ifndef ROCKSDB_LITE + +class EncryptedSequentialFile : public SequentialFile { + private: + std::unique_ptr file_; + std::unique_ptr stream_; + uint64_t offset_; + size_t prefixLength_; + + public: + // Default ctor. Given underlying sequential file is supposed to be at + // offset == prefixLength. + EncryptedSequentialFile(SequentialFile* f, BlockAccessCipherStream* s, size_t prefixLength) + : file_(f), stream_(s), offset_(prefixLength), prefixLength_(prefixLength) { + } + + // Read up to "n" bytes from the file. "scratch[0..n-1]" may be + // written by this routine. Sets "*result" to the data that was + // read (including if fewer than "n" bytes were successfully read). + // May set "*result" to point at data in "scratch[0..n-1]", so + // "scratch[0..n-1]" must be live when "*result" is used. + // If an error was encountered, returns a non-OK status. + // + // REQUIRES: External synchronization + Status Read(size_t n, Slice* result, char* scratch) override { + assert(scratch); + Status status = file_->Read(n, result, scratch); + if (!status.ok()) { + return status; + } + status = stream_->Decrypt(offset_, (char*)result->data(), result->size()); + offset_ += result->size(); // We've already ready data from disk, so update offset_ even if decryption fails. + return status; + } + + // Skip "n" bytes from the file. This is guaranteed to be no + // slower that reading the same data, but may be faster. + // + // If end of file is reached, skipping will stop at the end of the + // file, and Skip will return OK. + // + // REQUIRES: External synchronization + Status Skip(uint64_t n) override { + auto status = file_->Skip(n); + if (!status.ok()) { + return status; + } + offset_ += n; + return status; + } + + // Indicates the upper layers if the current SequentialFile implementation + // uses direct IO. + bool use_direct_io() const override { return file_->use_direct_io(); } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + size_t GetRequiredBufferAlignment() const override { + return file_->GetRequiredBufferAlignment(); + } + + // Remove any kind of caching of data from the offset to offset+length + // of this file. If the length is 0, then it refers to the end of file. + // If the system is not caching the file contents, then this is a noop. + Status InvalidateCache(size_t offset, size_t length) override { + return file_->InvalidateCache(offset + prefixLength_, length); + } + + // Positioned Read for direct I/O + // If Direct I/O enabled, offset, n, and scratch should be properly aligned + Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override { + assert(scratch); + offset += prefixLength_; // Skip prefix + auto status = file_->PositionedRead(offset, n, result, scratch); + if (!status.ok()) { + return status; + } + offset_ = offset + result->size(); + status = stream_->Decrypt(offset, (char*)result->data(), result->size()); + return status; + } +}; + +// A file abstraction for randomly reading the contents of a file. +class EncryptedRandomAccessFile : public RandomAccessFile { + private: + std::unique_ptr file_; + std::unique_ptr stream_; + size_t prefixLength_; + + public: + EncryptedRandomAccessFile(RandomAccessFile* f, BlockAccessCipherStream* s, size_t prefixLength) + : file_(f), stream_(s), prefixLength_(prefixLength) { } + + // Read up to "n" bytes from the file starting at "offset". + // "scratch[0..n-1]" may be written by this routine. Sets "*result" + // to the data that was read (including if fewer than "n" bytes were + // successfully read). May set "*result" to point at data in + // "scratch[0..n-1]", so "scratch[0..n-1]" must be live when + // "*result" is used. If an error was encountered, returns a non-OK + // status. + // + // Safe for concurrent use by multiple threads. + // If Direct I/O enabled, offset, n, and scratch should be aligned properly. + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + assert(scratch); + offset += prefixLength_; + auto status = file_->Read(offset, n, result, scratch); + if (!status.ok()) { + return status; + } + status = stream_->Decrypt(offset, (char*)result->data(), result->size()); + return status; + } + + // Readahead the file starting from offset by n bytes for caching. + Status Prefetch(uint64_t offset, size_t n) override { + //return Status::OK(); + return file_->Prefetch(offset + prefixLength_, n); + } + + // Tries to get an unique ID for this file that will be the same each time + // the file is opened (and will stay the same while the file is open). + // Furthermore, it tries to make this ID at most "max_size" bytes. If such an + // ID can be created this function returns the length of the ID and places it + // in "id"; otherwise, this function returns 0, in which case "id" + // may not have been modified. + // + // This function guarantees, for IDs from a given environment, two unique ids + // cannot be made equal to each other by adding arbitrary bytes to one of + // them. That is, no unique ID is the prefix of another. + // + // This function guarantees that the returned ID will not be interpretable as + // a single varint. + // + // Note: these IDs are only valid for the duration of the process. + size_t GetUniqueId(char* id, size_t max_size) const override { + return file_->GetUniqueId(id, max_size); + }; + + void Hint(AccessPattern pattern) override { file_->Hint(pattern); } + + // Indicates the upper layers if the current RandomAccessFile implementation + // uses direct IO. + bool use_direct_io() const override { return file_->use_direct_io(); } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + size_t GetRequiredBufferAlignment() const override { + return file_->GetRequiredBufferAlignment(); + } + + // Remove any kind of caching of data from the offset to offset+length + // of this file. If the length is 0, then it refers to the end of file. + // If the system is not caching the file contents, then this is a noop. + Status InvalidateCache(size_t offset, size_t length) override { + return file_->InvalidateCache(offset + prefixLength_, length); + } +}; + +// A file abstraction for sequential writing. The implementation +// must provide buffering since callers may append small fragments +// at a time to the file. +class EncryptedWritableFile : public WritableFileWrapper { + private: + std::unique_ptr file_; + std::unique_ptr stream_; + size_t prefixLength_; + + public: + // Default ctor. Prefix is assumed to be written already. + EncryptedWritableFile(WritableFile* f, BlockAccessCipherStream* s, size_t prefixLength) + : WritableFileWrapper(f), file_(f), stream_(s), prefixLength_(prefixLength) { } + + Status Append(const Slice& data) override { + AlignedBuffer buf; + Status status; + Slice dataToAppend(data); + if (data.size() > 0) { + auto offset = file_->GetFileSize(); // size including prefix + // Encrypt in cloned buffer + buf.Alignment(GetRequiredBufferAlignment()); + buf.AllocateNewBuffer(data.size()); + memmove(buf.BufferStart(), data.data(), data.size()); + status = stream_->Encrypt(offset, buf.BufferStart(), data.size()); + if (!status.ok()) { + return status; + } + dataToAppend = Slice(buf.BufferStart(), data.size()); + } + status = file_->Append(dataToAppend); + if (!status.ok()) { + return status; + } + return status; + } + + Status PositionedAppend(const Slice& data, uint64_t offset) override { + AlignedBuffer buf; + Status status; + Slice dataToAppend(data); + offset += prefixLength_; + if (data.size() > 0) { + // Encrypt in cloned buffer + buf.Alignment(GetRequiredBufferAlignment()); + buf.AllocateNewBuffer(data.size()); + memmove(buf.BufferStart(), data.data(), data.size()); + status = stream_->Encrypt(offset, buf.BufferStart(), data.size()); + if (!status.ok()) { + return status; + } + dataToAppend = Slice(buf.BufferStart(), data.size()); + } + status = file_->PositionedAppend(dataToAppend, offset); + if (!status.ok()) { + return status; + } + return status; + } + + // Indicates the upper layers if the current WritableFile implementation + // uses direct IO. + bool use_direct_io() const override { return file_->use_direct_io(); } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + size_t GetRequiredBufferAlignment() const override { + return file_->GetRequiredBufferAlignment(); + } + + /* + * Get the size of valid data in the file. + */ + uint64_t GetFileSize() override { + return file_->GetFileSize() - prefixLength_; + } + + // Truncate is necessary to trim the file to the correct size + // before closing. It is not always possible to keep track of the file + // size due to whole pages writes. The behavior is undefined if called + // with other writes to follow. + Status Truncate(uint64_t size) override { + return file_->Truncate(size + prefixLength_); + } + + // Remove any kind of caching of data from the offset to offset+length + // of this file. If the length is 0, then it refers to the end of file. + // If the system is not caching the file contents, then this is a noop. + // This call has no effect on dirty pages in the cache. + Status InvalidateCache(size_t offset, size_t length) override { + return file_->InvalidateCache(offset + prefixLength_, length); + } + + // Sync a file range with disk. + // offset is the starting byte of the file range to be synchronized. + // nbytes specifies the length of the range to be synchronized. + // This asks the OS to initiate flushing the cached data to disk, + // without waiting for completion. + // Default implementation does nothing. + Status RangeSync(uint64_t offset, uint64_t nbytes) override { + return file_->RangeSync(offset + prefixLength_, nbytes); + } + + // PrepareWrite performs any necessary preparation for a write + // before the write actually occurs. This allows for pre-allocation + // of space on devices where it can result in less file + // fragmentation and/or less waste from over-zealous filesystem + // pre-allocation. + void PrepareWrite(size_t offset, size_t len) override { + file_->PrepareWrite(offset + prefixLength_, len); + } + + // Pre-allocates space for a file. + Status Allocate(uint64_t offset, uint64_t len) override { + return file_->Allocate(offset + prefixLength_, len); + } +}; + +// A file abstraction for random reading and writing. +class EncryptedRandomRWFile : public RandomRWFile { + private: + std::unique_ptr file_; + std::unique_ptr stream_; + size_t prefixLength_; + + public: + EncryptedRandomRWFile(RandomRWFile* f, BlockAccessCipherStream* s, size_t prefixLength) + : file_(f), stream_(s), prefixLength_(prefixLength) {} + + // Indicates if the class makes use of direct I/O + // If false you must pass aligned buffer to Write() + bool use_direct_io() const override { return file_->use_direct_io(); } + + // Use the returned alignment value to allocate + // aligned buffer for Direct I/O + size_t GetRequiredBufferAlignment() const override { + return file_->GetRequiredBufferAlignment(); + } + + // Write bytes in `data` at offset `offset`, Returns Status::OK() on success. + // Pass aligned buffer when use_direct_io() returns true. + Status Write(uint64_t offset, const Slice& data) override { + AlignedBuffer buf; + Status status; + Slice dataToWrite(data); + offset += prefixLength_; + if (data.size() > 0) { + // Encrypt in cloned buffer + buf.Alignment(GetRequiredBufferAlignment()); + buf.AllocateNewBuffer(data.size()); + memmove(buf.BufferStart(), data.data(), data.size()); + status = stream_->Encrypt(offset, buf.BufferStart(), data.size()); + if (!status.ok()) { + return status; + } + dataToWrite = Slice(buf.BufferStart(), data.size()); + } + status = file_->Write(offset, dataToWrite); + return status; + } + + // Read up to `n` bytes starting from offset `offset` and store them in + // result, provided `scratch` size should be at least `n`. + // Returns Status::OK() on success. + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + assert(scratch); + offset += prefixLength_; + auto status = file_->Read(offset, n, result, scratch); + if (!status.ok()) { + return status; + } + status = stream_->Decrypt(offset, (char*)result->data(), result->size()); + return status; + } + + Status Flush() override { return file_->Flush(); } + + Status Sync() override { return file_->Sync(); } + + Status Fsync() override { return file_->Fsync(); } + + Status Close() override { return file_->Close(); } +}; + +// EncryptedEnv implements an Env wrapper that adds encryption to files stored on disk. +class EncryptedEnv : public EnvWrapper { + public: + EncryptedEnv(Env* base_env, EncryptionProvider *provider) + : EnvWrapper(base_env) { + provider_ = provider; + } + + // NewSequentialFile opens a file for sequential reading. + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + if (options.use_mmap_reads) { + return Status::InvalidArgument(); + } + // Open file using underlying Env implementation + std::unique_ptr underlying; + auto status = EnvWrapper::NewSequentialFile(fname, &underlying, options); + if (!status.ok()) { + return status; + } + // Read prefix (if needed) + AlignedBuffer prefixBuf; + Slice prefixSlice; + size_t prefixLength = provider_->GetPrefixLength(); + if (prefixLength > 0) { + // Read prefix + prefixBuf.Alignment(underlying->GetRequiredBufferAlignment()); + prefixBuf.AllocateNewBuffer(prefixLength); + status = underlying->Read(prefixLength, &prefixSlice, prefixBuf.BufferStart()); + if (!status.ok()) { + return status; + } + } + // Create cipher stream + std::unique_ptr stream; + status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream); + if (!status.ok()) { + return status; + } + (*result) = std::unique_ptr(new EncryptedSequentialFile(underlying.release(), stream.release(), prefixLength)); + return Status::OK(); + } + + // NewRandomAccessFile opens a file for random read access. + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + if (options.use_mmap_reads) { + return Status::InvalidArgument(); + } + // Open file using underlying Env implementation + std::unique_ptr underlying; + auto status = EnvWrapper::NewRandomAccessFile(fname, &underlying, options); + if (!status.ok()) { + return status; + } + // Read prefix (if needed) + AlignedBuffer prefixBuf; + Slice prefixSlice; + size_t prefixLength = provider_->GetPrefixLength(); + if (prefixLength > 0) { + // Read prefix + prefixBuf.Alignment(underlying->GetRequiredBufferAlignment()); + prefixBuf.AllocateNewBuffer(prefixLength); + status = underlying->Read(0, prefixLength, &prefixSlice, prefixBuf.BufferStart()); + if (!status.ok()) { + return status; + } + } + // Create cipher stream + std::unique_ptr stream; + status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream); + if (!status.ok()) { + return status; + } + (*result) = std::unique_ptr(new EncryptedRandomAccessFile(underlying.release(), stream.release(), prefixLength)); + return Status::OK(); + } + + // NewWritableFile opens a file for sequential writing. + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + if (options.use_mmap_writes) { + return Status::InvalidArgument(); + } + // Open file using underlying Env implementation + std::unique_ptr underlying; + Status status = EnvWrapper::NewWritableFile(fname, &underlying, options); + if (!status.ok()) { + return status; + } + // Initialize & write prefix (if needed) + AlignedBuffer prefixBuf; + Slice prefixSlice; + size_t prefixLength = provider_->GetPrefixLength(); + if (prefixLength > 0) { + // Initialize prefix + prefixBuf.Alignment(underlying->GetRequiredBufferAlignment()); + prefixBuf.AllocateNewBuffer(prefixLength); + provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength); + prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength); + // Write prefix + status = underlying->Append(prefixSlice); + if (!status.ok()) { + return status; + } + } + // Create cipher stream + std::unique_ptr stream; + status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream); + if (!status.ok()) { + return status; + } + (*result) = std::unique_ptr(new EncryptedWritableFile(underlying.release(), stream.release(), prefixLength)); + return Status::OK(); + } + + // Create an object that writes to a new file with the specified + // name. Deletes any existing file with the same name and creates a + // new file. On success, stores a pointer to the new file in + // *result and returns OK. On failure stores nullptr in *result and + // returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + if (options.use_mmap_writes) { + return Status::InvalidArgument(); + } + // Open file using underlying Env implementation + std::unique_ptr underlying; + Status status = EnvWrapper::ReopenWritableFile(fname, &underlying, options); + if (!status.ok()) { + return status; + } + // Initialize & write prefix (if needed) + AlignedBuffer prefixBuf; + Slice prefixSlice; + size_t prefixLength = provider_->GetPrefixLength(); + if (prefixLength > 0) { + // Initialize prefix + prefixBuf.Alignment(underlying->GetRequiredBufferAlignment()); + prefixBuf.AllocateNewBuffer(prefixLength); + provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength); + prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength); + // Write prefix + status = underlying->Append(prefixSlice); + if (!status.ok()) { + return status; + } + } + // Create cipher stream + std::unique_ptr stream; + status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream); + if (!status.ok()) { + return status; + } + (*result) = std::unique_ptr(new EncryptedWritableFile(underlying.release(), stream.release(), prefixLength)); + return Status::OK(); + } + + // Reuse an existing file by renaming it and opening it as writable. + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + if (options.use_mmap_writes) { + return Status::InvalidArgument(); + } + // Open file using underlying Env implementation + std::unique_ptr underlying; + Status status = EnvWrapper::ReuseWritableFile(fname, old_fname, &underlying, options); + if (!status.ok()) { + return status; + } + // Initialize & write prefix (if needed) + AlignedBuffer prefixBuf; + Slice prefixSlice; + size_t prefixLength = provider_->GetPrefixLength(); + if (prefixLength > 0) { + // Initialize prefix + prefixBuf.Alignment(underlying->GetRequiredBufferAlignment()); + prefixBuf.AllocateNewBuffer(prefixLength); + provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength); + prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength); + // Write prefix + status = underlying->Append(prefixSlice); + if (!status.ok()) { + return status; + } + } + // Create cipher stream + std::unique_ptr stream; + status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream); + if (!status.ok()) { + return status; + } + (*result) = std::unique_ptr(new EncryptedWritableFile(underlying.release(), stream.release(), prefixLength)); + return Status::OK(); + } + + // Open `fname` for random read and write, if file doesn't exist the file + // will be created. On success, stores a pointer to the new file in + // *result and returns OK. On failure returns non-OK. + // + // The returned file will only be accessed by one thread at a time. + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + if (options.use_mmap_reads || options.use_mmap_writes) { + return Status::InvalidArgument(); + } + // Check file exists + bool isNewFile = !FileExists(fname).ok(); + + // Open file using underlying Env implementation + std::unique_ptr underlying; + Status status = EnvWrapper::NewRandomRWFile(fname, &underlying, options); + if (!status.ok()) { + return status; + } + // Read or Initialize & write prefix (if needed) + AlignedBuffer prefixBuf; + Slice prefixSlice; + size_t prefixLength = provider_->GetPrefixLength(); + if (prefixLength > 0) { + prefixBuf.Alignment(underlying->GetRequiredBufferAlignment()); + prefixBuf.AllocateNewBuffer(prefixLength); + if (!isNewFile) { + // File already exists, read prefix + status = underlying->Read(0, prefixLength, &prefixSlice, prefixBuf.BufferStart()); + if (!status.ok()) { + return status; + } + } else { + // File is new, initialize & write prefix + provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength); + prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength); + // Write prefix + status = underlying->Write(0, prefixSlice); + if (!status.ok()) { + return status; + } + } + } + // Create cipher stream + std::unique_ptr stream; + status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream); + if (!status.ok()) { + return status; + } + (*result) = std::unique_ptr(new EncryptedRandomRWFile(underlying.release(), stream.release(), prefixLength)); + return Status::OK(); + } + + // Store in *result the attributes of the children of the specified directory. + // In case the implementation lists the directory prior to iterating the files + // and files are concurrently deleted, the deleted files will be omitted from + // result. + // The name attributes are relative to "dir". + // Original contents of *results are dropped. + // Returns OK if "dir" exists and "*result" contains its children. + // NotFound if "dir" does not exist, the calling process does not have + // permission to access "dir", or if "dir" is invalid. + // IOError if an IO Error was encountered + Status GetChildrenFileAttributes( + const std::string& dir, std::vector* result) override { + auto status = EnvWrapper::GetChildrenFileAttributes(dir, result); + if (!status.ok()) { + return status; + } + size_t prefixLength = provider_->GetPrefixLength(); + for (auto it = std::begin(*result); it!=std::end(*result); ++it) { + assert(it->size_bytes >= prefixLength); + it->size_bytes -= prefixLength; + } + return Status::OK(); + } + + // Store the size of fname in *file_size. + Status GetFileSize(const std::string& fname, uint64_t* file_size) override { + auto status = EnvWrapper::GetFileSize(fname, file_size); + if (!status.ok()) { + return status; + } + size_t prefixLength = provider_->GetPrefixLength(); + assert(*file_size >= prefixLength); + *file_size -= prefixLength; + return Status::OK(); + } + + private: + EncryptionProvider *provider_; +}; + + +// Returns an Env that encrypts data when stored on disk and decrypts data when +// read from disk. +Env* NewEncryptedEnv(Env* base_env, EncryptionProvider* provider) { + return new EncryptedEnv(base_env, provider); +} + +// Encrypt one or more (partial) blocks of data at the file offset. +// Length of data is given in dataSize. +Status BlockAccessCipherStream::Encrypt(uint64_t fileOffset, char *data, size_t dataSize) { + // Calculate block index + auto blockSize = BlockSize(); + uint64_t blockIndex = fileOffset / blockSize; + size_t blockOffset = fileOffset % blockSize; + std::unique_ptr blockBuffer; + + std::string scratch; + AllocateScratch(scratch); + + // Encrypt individual blocks. + while (1) { + char *block = data; + size_t n = std::min(dataSize, blockSize - blockOffset); + if (n != blockSize) { + // We're not encrypting a full block. + // Copy data to blockBuffer + if (!blockBuffer.get()) { + // Allocate buffer + blockBuffer = std::unique_ptr(new char[blockSize]); + } + block = blockBuffer.get(); + // Copy plain data to block buffer + memmove(block + blockOffset, data, n); + } + auto status = EncryptBlock(blockIndex, block, (char*)scratch.data()); + if (!status.ok()) { + return status; + } + if (block != data) { + // Copy encrypted data back to `data`. + memmove(data, block + blockOffset, n); + } + dataSize -= n; + if (dataSize == 0) { + return Status::OK(); + } + data += n; + blockOffset = 0; + blockIndex++; + } +} + +// Decrypt one or more (partial) blocks of data at the file offset. +// Length of data is given in dataSize. +Status BlockAccessCipherStream::Decrypt(uint64_t fileOffset, char *data, size_t dataSize) { + // Calculate block index + auto blockSize = BlockSize(); + uint64_t blockIndex = fileOffset / blockSize; + size_t blockOffset = fileOffset % blockSize; + std::unique_ptr blockBuffer; + + std::string scratch; + AllocateScratch(scratch); + + assert(fileOffset < dataSize); + + // Decrypt individual blocks. + while (1) { + char *block = data; + size_t n = std::min(dataSize, blockSize - blockOffset); + if (n != blockSize) { + // We're not decrypting a full block. + // Copy data to blockBuffer + if (!blockBuffer.get()) { + // Allocate buffer + blockBuffer = std::unique_ptr(new char[blockSize]); + } + block = blockBuffer.get(); + // Copy encrypted data to block buffer + memmove(block + blockOffset, data, n); + } + auto status = DecryptBlock(blockIndex, block, (char*)scratch.data()); + if (!status.ok()) { + return status; + } + if (block != data) { + // Copy decrypted data back to `data`. + memmove(data, block + blockOffset, n); + } + + // Simply decrementing dataSize by n could cause it to underflow, + // which will very likely make it read over the original bounds later + assert(dataSize >= n); + if (dataSize < n) { + return Status::Corruption("Cannot decrypt data at given offset"); + } + + dataSize -= n; + if (dataSize == 0) { + return Status::OK(); + } + data += n; + blockOffset = 0; + blockIndex++; + } +} + +// Encrypt a block of data. +// Length of data is equal to BlockSize(). +Status ROT13BlockCipher::Encrypt(char *data) { + for (size_t i = 0; i < blockSize_; ++i) { + data[i] += 13; + } + return Status::OK(); +} + +// Decrypt a block of data. +// Length of data is equal to BlockSize(). +Status ROT13BlockCipher::Decrypt(char *data) { + return Encrypt(data); +} + +// Allocate scratch space which is passed to EncryptBlock/DecryptBlock. +void CTRCipherStream::AllocateScratch(std::string& scratch) { + auto blockSize = cipher_.BlockSize(); + scratch.reserve(blockSize); +} + +// Encrypt a block of data at the given block index. +// Length of data is equal to BlockSize(); +Status CTRCipherStream::EncryptBlock(uint64_t blockIndex, char *data, char* scratch) { + + // Create nonce + counter + auto blockSize = cipher_.BlockSize(); + memmove(scratch, iv_.data(), blockSize); + EncodeFixed64(scratch, blockIndex + initialCounter_); + + // Encrypt nonce+counter + auto status = cipher_.Encrypt(scratch); + if (!status.ok()) { + return status; + } + + // XOR data with ciphertext. + for (size_t i = 0; i < blockSize; i++) { + data[i] = data[i] ^ scratch[i]; + } + return Status::OK(); +} + +// Decrypt a block of data at the given block index. +// Length of data is equal to BlockSize(); +Status CTRCipherStream::DecryptBlock(uint64_t blockIndex, char *data, char* scratch) { + // For CTR decryption & encryption are the same + return EncryptBlock(blockIndex, data, scratch); +} + +// GetPrefixLength returns the length of the prefix that is added to every file +// and used for storing encryption options. +// For optimal performance, the prefix length should be a multiple of +// the page size. +size_t CTREncryptionProvider::GetPrefixLength() { + return defaultPrefixLength; +} + +// decodeCTRParameters decodes the initial counter & IV from the given +// (plain text) prefix. +static void decodeCTRParameters(const char *prefix, size_t blockSize, uint64_t &initialCounter, Slice &iv) { + // First block contains 64-bit initial counter + initialCounter = DecodeFixed64(prefix); + // Second block contains IV + iv = Slice(prefix + blockSize, blockSize); +} + +// CreateNewPrefix initialized an allocated block of prefix memory +// for a new file. +Status CTREncryptionProvider::CreateNewPrefix(const std::string& /*fname*/, + char* prefix, + size_t prefixLength) { + // Create & seed rnd. + Random rnd((uint32_t)Env::Default()->NowMicros()); + // Fill entire prefix block with random values. + for (size_t i = 0; i < prefixLength; i++) { + prefix[i] = rnd.Uniform(256) & 0xFF; + } + // Take random data to extract initial counter & IV + auto blockSize = cipher_.BlockSize(); + uint64_t initialCounter; + Slice prefixIV; + decodeCTRParameters(prefix, blockSize, initialCounter, prefixIV); + + // Now populate the rest of the prefix, starting from the third block. + PopulateSecretPrefixPart(prefix + (2 * blockSize), prefixLength - (2 * blockSize), blockSize); + + // Encrypt the prefix, starting from block 2 (leave block 0, 1 with initial counter & IV unencrypted) + CTRCipherStream cipherStream(cipher_, prefixIV.data(), initialCounter); + auto status = cipherStream.Encrypt(0, prefix + (2 * blockSize), prefixLength - (2 * blockSize)); + if (!status.ok()) { + return status; + } + return Status::OK(); +} + +// PopulateSecretPrefixPart initializes the data into a new prefix block +// in plain text. +// Returns the amount of space (starting from the start of the prefix) +// that has been initialized. +size_t CTREncryptionProvider::PopulateSecretPrefixPart(char* /*prefix*/, + size_t /*prefixLength*/, + size_t /*blockSize*/) { + // Nothing to do here, put in custom data in override when needed. + return 0; +} + +Status CTREncryptionProvider::CreateCipherStream( + const std::string& fname, const EnvOptions& options, Slice& prefix, + std::unique_ptr* result) { + // Read plain text part of prefix. + auto blockSize = cipher_.BlockSize(); + uint64_t initialCounter; + Slice iv; + decodeCTRParameters(prefix.data(), blockSize, initialCounter, iv); + + // If the prefix is smaller than twice the block size, we would below read a + // very large chunk of the file (and very likely read over the bounds) + assert(prefix.size() >= 2 * blockSize); + if (prefix.size() < 2 * blockSize) { + return Status::Corruption("Unable to read from file " + fname + ": read attempt would read beyond file bounds"); + } + + // Decrypt the encrypted part of the prefix, starting from block 2 (block 0, 1 with initial counter & IV are unencrypted) + CTRCipherStream cipherStream(cipher_, iv.data(), initialCounter); + auto status = cipherStream.Decrypt(0, (char*)prefix.data() + (2 * blockSize), prefix.size() - (2 * blockSize)); + if (!status.ok()) { + return status; + } + + // Create cipher stream + return CreateCipherStreamFromPrefix(fname, options, initialCounter, iv, prefix, result); +} + +// CreateCipherStreamFromPrefix creates a block access cipher stream for a file given +// given name and options. The given prefix is already decrypted. +Status CTREncryptionProvider::CreateCipherStreamFromPrefix( + const std::string& /*fname*/, const EnvOptions& /*options*/, + uint64_t initialCounter, const Slice& iv, const Slice& /*prefix*/, + std::unique_ptr* result) { + (*result) = std::unique_ptr( + new CTRCipherStream(cipher_, iv.data(), initialCounter)); + return Status::OK(); +} + +#endif // ROCKSDB_LITE + +} // namespace rocksdb diff --git a/src/rocksdb/env/env_hdfs.cc b/src/rocksdb/env/env_hdfs.cc new file mode 100644 index 00000000..5acf9301 --- /dev/null +++ b/src/rocksdb/env/env_hdfs.cc @@ -0,0 +1,627 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "rocksdb/env.h" +#include "hdfs/env_hdfs.h" + +#ifdef USE_HDFS +#ifndef ROCKSDB_HDFS_FILE_C +#define ROCKSDB_HDFS_FILE_C + +#include +#include +#include +#include +#include +#include +#include "rocksdb/status.h" +#include "util/logging.h" +#include "util/string_util.h" + +#define HDFS_EXISTS 0 +#define HDFS_DOESNT_EXIST -1 +#define HDFS_SUCCESS 0 + +// +// This file defines an HDFS environment for rocksdb. It uses the libhdfs +// api to access HDFS. All HDFS files created by one instance of rocksdb +// will reside on the same HDFS cluster. +// + +namespace rocksdb { + +namespace { + +// Log error message +static Status IOError(const std::string& context, int err_number) { + return (err_number == ENOSPC) + ? Status::NoSpace(context, strerror(err_number)) + : (err_number == ENOENT) + ? Status::PathNotFound(context, strerror(err_number)) + : Status::IOError(context, strerror(err_number)); +} + +// assume that there is one global logger for now. It is not thread-safe, +// but need not be because the logger is initialized at db-open time. +static Logger* mylog = nullptr; + +// Used for reading a file from HDFS. It implements both sequential-read +// access methods as well as random read access methods. +class HdfsReadableFile : virtual public SequentialFile, + virtual public RandomAccessFile { + private: + hdfsFS fileSys_; + std::string filename_; + hdfsFile hfile_; + + public: + HdfsReadableFile(hdfsFS fileSys, const std::string& fname) + : fileSys_(fileSys), filename_(fname), hfile_(nullptr) { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n", + filename_.c_str()); + hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0); + ROCKS_LOG_DEBUG(mylog, + "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n", + filename_.c_str(), hfile_); + } + + virtual ~HdfsReadableFile() { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n", + filename_.c_str()); + hdfsCloseFile(fileSys_, hfile_); + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n", + filename_.c_str()); + hfile_ = nullptr; + } + + bool isValid() { + return hfile_ != nullptr; + } + + // sequential access, read data at current offset in file + virtual Status Read(size_t n, Slice* result, char* scratch) { + Status s; + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n", + filename_.c_str(), n); + + char* buffer = scratch; + size_t total_bytes_read = 0; + tSize bytes_read = 0; + tSize remaining_bytes = (tSize)n; + + // Read a total of n bytes repeatedly until we hit error or eof + while (remaining_bytes > 0) { + bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes); + if (bytes_read <= 0) { + break; + } + assert(bytes_read <= remaining_bytes); + + total_bytes_read += bytes_read; + remaining_bytes -= bytes_read; + buffer += bytes_read; + } + assert(total_bytes_read <= n); + + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n", + filename_.c_str()); + + if (bytes_read < 0) { + s = IOError(filename_, errno); + } else { + *result = Slice(scratch, total_bytes_read); + } + + return s; + } + + // random access, read data from specified offset in file + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + Status s; + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n", + filename_.c_str()); + ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset, + (void*)scratch, (tSize)n); + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n", + filename_.c_str()); + *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read); + if (bytes_read < 0) { + // An error: return a non-ok status + s = IOError(filename_, errno); + } + return s; + } + + virtual Status Skip(uint64_t n) { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n", + filename_.c_str()); + // get current offset from file + tOffset current = hdfsTell(fileSys_, hfile_); + if (current < 0) { + return IOError(filename_, errno); + } + // seek to new offset in file + tOffset newoffset = current + n; + int val = hdfsSeek(fileSys_, hfile_, newoffset); + if (val < 0) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + private: + + // returns true if we are at the end of file, false otherwise + bool feof() { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n", + filename_.c_str()); + if (hdfsTell(fileSys_, hfile_) == fileSize()) { + return true; + } + return false; + } + + // the current size of the file + tOffset fileSize() { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n", + filename_.c_str()); + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str()); + tOffset size = 0L; + if (pFileInfo != nullptr) { + size = pFileInfo->mSize; + hdfsFreeFileInfo(pFileInfo, 1); + } else { + throw HdfsFatalException("fileSize on unknown file " + filename_); + } + return size; + } +}; + +// Appends to an existing file in HDFS. +class HdfsWritableFile: public WritableFile { + private: + hdfsFS fileSys_; + std::string filename_; + hdfsFile hfile_; + + public: + HdfsWritableFile(hdfsFS fileSys, const std::string& fname) + : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n", + filename_.c_str()); + hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0); + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n", + filename_.c_str()); + assert(hfile_ != nullptr); + } + virtual ~HdfsWritableFile() { + if (hfile_ != nullptr) { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n", + filename_.c_str()); + hdfsCloseFile(fileSys_, hfile_); + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n", + filename_.c_str()); + hfile_ = nullptr; + } + } + + // If the file was successfully created, then this returns true. + // Otherwise returns false. + bool isValid() { + return hfile_ != nullptr; + } + + // The name of the file, mostly needed for debug logging. + const std::string& getName() { + return filename_; + } + + virtual Status Append(const Slice& data) { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n", + filename_.c_str()); + const char* src = data.data(); + size_t left = data.size(); + size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast(left)); + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n", + filename_.c_str()); + if (ret != left) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + virtual Status Flush() { + return Status::OK(); + } + + virtual Status Sync() { + Status s; + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n", + filename_.c_str()); + if (hdfsFlush(fileSys_, hfile_) == -1) { + return IOError(filename_, errno); + } + if (hdfsHSync(fileSys_, hfile_) == -1) { + return IOError(filename_, errno); + } + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n", + filename_.c_str()); + return Status::OK(); + } + + // This is used by HdfsLogger to write data to the debug log file + virtual Status Append(const char* src, size_t size) { + if (hdfsWrite(fileSys_, hfile_, src, static_cast(size)) != + static_cast(size)) { + return IOError(filename_, errno); + } + return Status::OK(); + } + + virtual Status Close() { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n", + filename_.c_str()); + if (hdfsCloseFile(fileSys_, hfile_) != 0) { + return IOError(filename_, errno); + } + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n", + filename_.c_str()); + hfile_ = nullptr; + return Status::OK(); + } +}; + +// The object that implements the debug logs to reside in HDFS. +class HdfsLogger : public Logger { + private: + HdfsWritableFile* file_; + uint64_t (*gettid_)(); // Return the thread id for the current thread + + Status HdfsCloseHelper() { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", + file_->getName().c_str()); + if (mylog != nullptr && mylog == this) { + mylog = nullptr; + } + return Status::OK(); + } + + protected: + virtual Status CloseImpl() override { return HdfsCloseHelper(); } + + public: + HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) + : file_(f), gettid_(gettid) { + ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n", + file_->getName().c_str()); + } + + ~HdfsLogger() override { + if (!closed_) { + closed_ = true; + HdfsCloseHelper(); + } + } + + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + const uint64_t thread_id = (*gettid_)(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 30000; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + p += snprintf(p, limit - p, + "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec, + static_cast(now_tv.tv_usec), + static_cast(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + file_->Append(base, p-base); + file_->Flush(); + if (base != buffer) { + delete[] base; + } + break; + } + } +}; + +} // namespace + +// Finally, the hdfs environment + +const std::string HdfsEnv::kProto = "hdfs://"; +const std::string HdfsEnv::pathsep = "/"; + +// open a file for sequential reading +Status HdfsEnv::NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& /*options*/) { + result->reset(); + HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); + if (f == nullptr || !f->isValid()) { + delete f; + *result = nullptr; + return IOError(fname, errno); + } + result->reset(dynamic_cast(f)); + return Status::OK(); +} + +// open a file for random reading +Status HdfsEnv::NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& /*options*/) { + result->reset(); + HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); + if (f == nullptr || !f->isValid()) { + delete f; + *result = nullptr; + return IOError(fname, errno); + } + result->reset(dynamic_cast(f)); + return Status::OK(); +} + +// create a new file for writing +Status HdfsEnv::NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& /*options*/) { + result->reset(); + Status s; + HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); + if (f == nullptr || !f->isValid()) { + delete f; + *result = nullptr; + return IOError(fname, errno); + } + result->reset(dynamic_cast(f)); + return Status::OK(); +} + +class HdfsDirectory : public Directory { + public: + explicit HdfsDirectory(int fd) : fd_(fd) {} + ~HdfsDirectory() {} + + Status Fsync() override { return Status::OK(); } + + int GetFd() const { return fd_; } + + private: + int fd_; +}; + +Status HdfsEnv::NewDirectory(const std::string& name, + std::unique_ptr* result) { + int value = hdfsExists(fileSys_, name.c_str()); + switch (value) { + case HDFS_EXISTS: + result->reset(new HdfsDirectory(0)); + return Status::OK(); + default: // fail if the directory doesn't exist + ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + ToString(value) + " on path " + name + + ".\n"); + } +} + +Status HdfsEnv::FileExists(const std::string& fname) { + int value = hdfsExists(fileSys_, fname.c_str()); + switch (value) { + case HDFS_EXISTS: + return Status::OK(); + case HDFS_DOESNT_EXIST: + return Status::NotFound(); + default: // anything else should be an error + ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed"); + return Status::IOError("hdfsExists call failed with error " + + ToString(value) + " on path " + fname + ".\n"); + } +} + +Status HdfsEnv::GetChildren(const std::string& path, + std::vector* result) { + int value = hdfsExists(fileSys_, path.c_str()); + switch (value) { + case HDFS_EXISTS: { // directory exists + int numEntries = 0; + hdfsFileInfo* pHdfsFileInfo = 0; + pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); + if (numEntries >= 0) { + for(int i = 0; i < numEntries; i++) { + std::string pathname(pHdfsFileInfo[i].mName); + size_t pos = pathname.rfind("/"); + if (std::string::npos != pos) { + result->push_back(pathname.substr(pos + 1)); + } + } + if (pHdfsFileInfo != nullptr) { + hdfsFreeFileInfo(pHdfsFileInfo, numEntries); + } + } else { + // numEntries < 0 indicates error + ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error "); + throw HdfsFatalException( + "hdfsListDirectory call failed negative error.\n"); + } + break; + } + case HDFS_DOESNT_EXIST: // directory does not exist, exit + return Status::NotFound(); + default: // anything else should be an error + ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + ToString(value) + ".\n"); + } + return Status::OK(); +} + +Status HdfsEnv::DeleteFile(const std::string& fname) { + if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) { + return Status::OK(); + } + return IOError(fname, errno); +}; + +Status HdfsEnv::CreateDir(const std::string& name) { + if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) { + return Status::OK(); + } + return IOError(name, errno); +}; + +Status HdfsEnv::CreateDirIfMissing(const std::string& name) { + const int value = hdfsExists(fileSys_, name.c_str()); + // Not atomic. state might change b/w hdfsExists and CreateDir. + switch (value) { + case HDFS_EXISTS: + return Status::OK(); + case HDFS_DOESNT_EXIST: + return CreateDir(name); + default: // anything else should be an error + ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed"); + throw HdfsFatalException("hdfsExists call failed with error " + + ToString(value) + ".\n"); + } +}; + +Status HdfsEnv::DeleteDir(const std::string& name) { + return DeleteFile(name); +}; + +Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) { + *size = 0L; + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); + if (pFileInfo != nullptr) { + *size = pFileInfo->mSize; + hdfsFreeFileInfo(pFileInfo, 1); + return Status::OK(); + } + return IOError(fname, errno); +} + +Status HdfsEnv::GetFileModificationTime(const std::string& fname, + uint64_t* time) { + hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); + if (pFileInfo != nullptr) { + *time = static_cast(pFileInfo->mLastMod); + hdfsFreeFileInfo(pFileInfo, 1); + return Status::OK(); + } + return IOError(fname, errno); + +} + +// The rename is not atomic. HDFS does not allow a renaming if the +// target already exists. So, we delete the target before attempting the +// rename. +Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { + hdfsDelete(fileSys_, target.c_str(), 1); + if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { + return Status::OK(); + } + return IOError(src, errno); +} + +Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) { + // there isn's a very good way to atomically check and create + // a file via libhdfs + *lock = nullptr; + return Status::OK(); +} + +Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); } + +Status HdfsEnv::NewLogger(const std::string& fname, + std::shared_ptr* result) { + HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); + if (f == nullptr || !f->isValid()) { + delete f; + *result = nullptr; + return IOError(fname, errno); + } + HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid); + result->reset(h); + if (mylog == nullptr) { + // mylog = h; // uncomment this for detailed logging + } + return Status::OK(); +} + +// The factory method for creating an HDFS Env +Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { + *hdfs_env = new HdfsEnv(fsname); + return Status::OK(); +} +} // namespace rocksdb + +#endif // ROCKSDB_HDFS_FILE_C + +#else // USE_HDFS + +// dummy placeholders used when HDFS is not available +namespace rocksdb { +Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/, + std::unique_ptr* /*result*/, + const EnvOptions& /*options*/) { + return Status::NotSupported("Not compiled with hdfs support"); +} + + Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) { + return Status::NotSupported("Not compiled with hdfs support"); + } +} + +#endif diff --git a/src/rocksdb/env/env_posix.cc b/src/rocksdb/env/env_posix.cc new file mode 100644 index 00000000..387c0279 --- /dev/null +++ b/src/rocksdb/env/env_posix.cc @@ -0,0 +1,1128 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors +#include +#include +#include +#if defined(OS_LINUX) +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID) +#include +#include +#include +#endif +#include +#include +#include +#include +#include +// Get nano time includes +#if defined(OS_LINUX) || defined(OS_FREEBSD) +#elif defined(__MACH__) +#include +#include +#else +#include +#endif +#include +#include +#include + +#include "env/io_posix.h" +#include "env/posix_logger.h" +#include "monitoring/iostats_context_imp.h" +#include "monitoring/thread_status_updater.h" +#include "port/port.h" +#include "rocksdb/options.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/compression_context_cache.h" +#include "util/logging.h" +#include "util/random.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/thread_local.h" +#include "util/threadpool_imp.h" + +#if !defined(TMPFS_MAGIC) +#define TMPFS_MAGIC 0x01021994 +#endif +#if !defined(XFS_SUPER_MAGIC) +#define XFS_SUPER_MAGIC 0x58465342 +#endif +#if !defined(EXT4_SUPER_MAGIC) +#define EXT4_SUPER_MAGIC 0xEF53 +#endif + +namespace rocksdb { + +namespace { + +ThreadStatusUpdater* CreateThreadStatusUpdater() { + return new ThreadStatusUpdater(); +} + +inline mode_t GetDBFileMode(bool allow_non_owner_access) { + return allow_non_owner_access ? 0644 : 0600; +} + +// list of pathnames that are locked +static std::set lockedFiles; +static port::Mutex mutex_lockedFiles; + +static int LockOrUnlock(int fd, bool lock) { + errno = 0; + struct flock f; + memset(&f, 0, sizeof(f)); + f.l_type = (lock ? F_WRLCK : F_UNLCK); + f.l_whence = SEEK_SET; + f.l_start = 0; + f.l_len = 0; // Lock/unlock entire file + int value = fcntl(fd, F_SETLK, &f); + + return value; +} + +class PosixFileLock : public FileLock { + public: + int fd_; + std::string filename; +}; + +int cloexec_flags(int flags, const EnvOptions* options) { + // If the system supports opening the file with cloexec enabled, + // do so, as this avoids a race condition if a db is opened around + // the same time that a child process is forked +#ifdef O_CLOEXEC + if (options == nullptr || options->set_fd_cloexec) { + flags |= O_CLOEXEC; + } +#endif + return flags; +} + +class PosixEnv : public Env { + public: + PosixEnv(); + + ~PosixEnv() override { + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].JoinAllThreads(); + } + // Delete the thread_status_updater_ only when the current Env is not + // Env::Default(). This is to avoid the free-after-use error when + // Env::Default() is destructed while some other child threads are + // still trying to update thread status. + if (this != Env::Default()) { + delete thread_status_updater_; + } + } + + void SetFD_CLOEXEC(int fd, const EnvOptions* options) { + if ((options == nullptr || options->set_fd_cloexec) && fd > 0) { + fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC); + } + } + + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + int fd = -1; + int flags = cloexec_flags(O_RDONLY, &options); + FILE* file = nullptr; + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; +#endif + } + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + return IOError("While opening a file for sequentially reading", fname, + errno); + } + + SetFD_CLOEXEC(fd, &options); + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + return IOError("While fcntl NoCache", fname, errno); + } +#endif + } else { + do { + IOSTATS_TIMER_GUARD(open_nanos); + file = fdopen(fd, "r"); + } while (file == nullptr && errno == EINTR); + if (file == nullptr) { + close(fd); + return IOError("While opening file for sequentially read", fname, + errno); + } + } + result->reset(new PosixSequentialFile(fname, file, fd, options)); + return Status::OK(); + } + + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + Status s; + int fd; + int flags = cloexec_flags(O_RDONLY, &options); + + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; + TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags); +#endif + } + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + return IOError("While open a file for random read", fname, errno); + } + SetFD_CLOEXEC(fd, &options); + + if (options.use_mmap_reads && sizeof(void*) >= 8) { + // Use of mmap for random reads has been removed because it + // kills performance when storage is fast. + // Use mmap when virtual address-space is plentiful. + uint64_t size; + s = GetFileSize(fname, &size); + if (s.ok()) { + void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0); + if (base != MAP_FAILED) { + result->reset(new PosixMmapReadableFile(fd, fname, base, + size, options)); + } else { + s = IOError("while mmap file for read", fname, errno); + close(fd); + } + } + } else { + if (options.use_direct_reads && !options.use_mmap_reads) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + return IOError("while fcntl NoCache", fname, errno); + } +#endif + } + result->reset(new PosixRandomAccessFile(fname, fd, options)); + } + return s; + } + + virtual Status OpenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options, + bool reopen = false) { + result->reset(); + Status s; + int fd = -1; + int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC); + // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) + if (options.use_direct_writes && !options.use_mmap_writes) { + // Note: we should avoid O_APPEND here due to ta the following bug: + // POSIX requires that opening a file with the O_APPEND flag should + // have no affect on the location at which pwrite() writes data. + // However, on Linux, if a file is opened with O_APPEND, pwrite() + // appends data to the end of the file, regardless of the value of + // offset. + // More info here: https://linux.die.net/man/2/pwrite +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // ROCKSDB_LITE + flags |= O_WRONLY; +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; +#endif + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + } else if (options.use_mmap_writes) { + // non-direct I/O + flags |= O_RDWR; + } else { + flags |= O_WRONLY; + } + + flags = cloexec_flags(flags, &options); + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + + if (fd < 0) { + s = IOError("While open a file for appending", fname, errno); + return s; + } + SetFD_CLOEXEC(fd, &options); + + if (options.use_mmap_writes) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. + // do not use mmapWrite on non ext-3/xfs/tmpfs systems. + if (!SupportsFastAllocate(fname)) { + forceMmapOff_ = true; + } + checkedDiskForMmap_ = true; + } + } + if (options.use_mmap_writes && !forceMmapOff_) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + s = IOError("While fcntl NoCache an opened file for appending", fname, + errno); + return s; + } +#elif defined(OS_SOLARIS) + if (directio(fd, DIRECTIO_ON) == -1) { + if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON + close(fd); + s = IOError("While calling directio()", fname, errno); + return s; + } + } +#endif + result->reset(new PosixWritableFile(fname, fd, options)); + } else { + // disable mmap writes + EnvOptions no_mmap_writes_options = options; + no_mmap_writes_options.use_mmap_writes = false; + result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + } + return s; + } + + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + return OpenWritableFile(fname, result, options, false); + } + + Status ReopenWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + return OpenWritableFile(fname, result, options, true); + } + + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override { + result->reset(); + Status s; + int fd = -1; + + int flags = 0; + // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX) + if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef ROCKSDB_LITE + return Status::IOError(fname, "Direct I/O not supported in RocksDB lite"); +#endif // !ROCKSDB_LITE + flags |= O_WRONLY; +#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS) + flags |= O_DIRECT; +#endif + TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags); + } else if (options.use_mmap_writes) { + // mmap needs O_RDWR mode + flags |= O_RDWR; + } else { + flags |= O_WRONLY; + } + + flags = cloexec_flags(flags, &options); + + do { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(old_fname.c_str(), flags, + GetDBFileMode(allow_non_owner_access_)); + } while (fd < 0 && errno == EINTR); + if (fd < 0) { + s = IOError("while reopen file for write", fname, errno); + return s; + } + + SetFD_CLOEXEC(fd, &options); + // rename into place + if (rename(old_fname.c_str(), fname.c_str()) != 0) { + s = IOError("while rename file to " + fname, old_fname, errno); + close(fd); + return s; + } + + if (options.use_mmap_writes) { + if (!checkedDiskForMmap_) { + // this will be executed once in the program's lifetime. + // do not use mmapWrite on non ext-3/xfs/tmpfs systems. + if (!SupportsFastAllocate(fname)) { + forceMmapOff_ = true; + } + checkedDiskForMmap_ = true; + } + } + if (options.use_mmap_writes && !forceMmapOff_) { + result->reset(new PosixMmapFile(fname, fd, page_size_, options)); + } else if (options.use_direct_writes && !options.use_mmap_writes) { +#ifdef OS_MACOSX + if (fcntl(fd, F_NOCACHE, 1) == -1) { + close(fd); + s = IOError("while fcntl NoCache for reopened file for append", fname, + errno); + return s; + } +#elif defined(OS_SOLARIS) + if (directio(fd, DIRECTIO_ON) == -1) { + if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON + close(fd); + s = IOError("while calling directio()", fname, errno); + return s; + } + } +#endif + result->reset(new PosixWritableFile(fname, fd, options)); + } else { + // disable mmap writes + EnvOptions no_mmap_writes_options = options; + no_mmap_writes_options.use_mmap_writes = false; + result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + } + return s; + } + + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override { + int fd = -1; + int flags = cloexec_flags(O_RDWR, &options); + + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + + fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_)); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + return IOError("While open file for random read/write", fname, errno); + } + } + + SetFD_CLOEXEC(fd, &options); + result->reset(new PosixRandomRWFile(fname, fd, options)); + return Status::OK(); + } + + Status NewMemoryMappedFileBuffer( + const std::string& fname, + std::unique_ptr* result) override { + int fd = -1; + Status status; + int flags = cloexec_flags(O_RDWR, nullptr); + + while (fd < 0) { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, 0644); + if (fd < 0) { + // Error while opening the file + if (errno == EINTR) { + continue; + } + status = + IOError("While open file for raw mmap buffer access", fname, errno); + break; + } + } + uint64_t size; + if (status.ok()) { + status = GetFileSize(fname, &size); + } + void* base = nullptr; + if (status.ok()) { + base = mmap(nullptr, static_cast(size), PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + status = IOError("while mmap file for read", fname, errno); + } + } + if (status.ok()) { + result->reset( + new PosixMemoryMappedFileBuffer(base, static_cast(size))); + } + if (fd >= 0) { + // don't need to keep it open after mmap has been called + close(fd); + } + return status; + } + + Status NewDirectory(const std::string& name, + std::unique_ptr* result) override { + result->reset(); + int fd; + int flags = cloexec_flags(0, nullptr); + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(name.c_str(), flags); + } + if (fd < 0) { + return IOError("While open directory", name, errno); + } else { + result->reset(new PosixDirectory(fd)); + } + return Status::OK(); + } + + Status FileExists(const std::string& fname) override { + int result = access(fname.c_str(), F_OK); + + if (result == 0) { + return Status::OK(); + } + + int err = errno; + switch (err) { + case EACCES: + case ELOOP: + case ENAMETOOLONG: + case ENOENT: + case ENOTDIR: + return Status::NotFound(); + default: + assert(err == EIO || err == ENOMEM); + return Status::IOError("Unexpected error(" + ToString(err) + + ") accessing file `" + fname + "' "); + } + } + + Status GetChildren(const std::string& dir, + std::vector* result) override { + result->clear(); + DIR* d = opendir(dir.c_str()); + if (d == nullptr) { + switch (errno) { + case EACCES: + case ENOENT: + case ENOTDIR: + return Status::NotFound(); + default: + return IOError("While opendir", dir, errno); + } + } + struct dirent* entry; + while ((entry = readdir(d)) != nullptr) { + result->push_back(entry->d_name); + } + closedir(d); + return Status::OK(); + } + + Status DeleteFile(const std::string& fname) override { + Status result; + if (unlink(fname.c_str()) != 0) { + result = IOError("while unlink() file", fname, errno); + } + return result; + }; + + Status CreateDir(const std::string& name) override { + Status result; + if (mkdir(name.c_str(), 0755) != 0) { + result = IOError("While mkdir", name, errno); + } + return result; + }; + + Status CreateDirIfMissing(const std::string& name) override { + Status result; + if (mkdir(name.c_str(), 0755) != 0) { + if (errno != EEXIST) { + result = IOError("While mkdir if missing", name, errno); + } else if (!DirExists(name)) { // Check that name is actually a + // directory. + // Message is taken from mkdir + result = Status::IOError("`"+name+"' exists but is not a directory"); + } + } + return result; + }; + + Status DeleteDir(const std::string& name) override { + Status result; + if (rmdir(name.c_str()) != 0) { + result = IOError("file rmdir", name, errno); + } + return result; + }; + + Status GetFileSize(const std::string& fname, uint64_t* size) override { + Status s; + struct stat sbuf; + if (stat(fname.c_str(), &sbuf) != 0) { + *size = 0; + s = IOError("while stat a file for size", fname, errno); + } else { + *size = sbuf.st_size; + } + return s; + } + + Status GetFileModificationTime(const std::string& fname, + uint64_t* file_mtime) override { + struct stat s; + if (stat(fname.c_str(), &s) !=0) { + return IOError("while stat a file for modification time", fname, errno); + } + *file_mtime = static_cast(s.st_mtime); + return Status::OK(); + } + Status RenameFile(const std::string& src, + const std::string& target) override { + Status result; + if (rename(src.c_str(), target.c_str()) != 0) { + result = IOError("While renaming a file to " + target, src, errno); + } + return result; + } + + Status LinkFile(const std::string& src, const std::string& target) override { + Status result; + if (link(src.c_str(), target.c_str()) != 0) { + if (errno == EXDEV) { + return Status::NotSupported("No cross FS links allowed"); + } + result = IOError("while link file to " + target, src, errno); + } + return result; + } + + Status NumFileLinks(const std::string& fname, uint64_t* count) override { + struct stat s; + if (stat(fname.c_str(), &s) != 0) { + return IOError("while stat a file for num file links", fname, errno); + } + *count = static_cast(s.st_nlink); + return Status::OK(); + } + + Status AreFilesSame(const std::string& first, const std::string& second, + bool* res) override { + struct stat statbuf[2]; + if (stat(first.c_str(), &statbuf[0]) != 0) { + return IOError("stat file", first, errno); + } + if (stat(second.c_str(), &statbuf[1]) != 0) { + return IOError("stat file", second, errno); + } + + if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) || + minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) || + statbuf[0].st_ino != statbuf[1].st_ino) { + *res = false; + } else { + *res = true; + } + return Status::OK(); + } + + Status LockFile(const std::string& fname, FileLock** lock) override { + *lock = nullptr; + Status result; + + mutex_lockedFiles.Lock(); + // If it already exists in the lockedFiles set, then it is already locked, + // and fail this lock attempt. Otherwise, insert it into lockedFiles. + // This check is needed because fcntl() does not detect lock conflict + // if the fcntl is issued by the same thread that earlier acquired + // this lock. + // We must do this check *before* opening the file: + // Otherwise, we will open a new file descriptor. Locks are associated with + // a process, not a file descriptor and when *any* file descriptor is closed, + // all locks the process holds for that *file* are released + if (lockedFiles.insert(fname).second == false) { + mutex_lockedFiles.Unlock(); + errno = ENOLCK; + return IOError("lock ", fname, errno); + } + + int fd; + int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr); + + { + IOSTATS_TIMER_GUARD(open_nanos); + fd = open(fname.c_str(), flags, 0644); + } + if (fd < 0) { + result = IOError("while open a file for lock", fname, errno); + } else if (LockOrUnlock(fd, true) == -1) { + // if there is an error in locking, then remove the pathname from lockedfiles + lockedFiles.erase(fname); + result = IOError("While lock file", fname, errno); + close(fd); + } else { + SetFD_CLOEXEC(fd, nullptr); + PosixFileLock* my_lock = new PosixFileLock; + my_lock->fd_ = fd; + my_lock->filename = fname; + *lock = my_lock; + } + + mutex_lockedFiles.Unlock(); + return result; + } + + Status UnlockFile(FileLock* lock) override { + PosixFileLock* my_lock = reinterpret_cast(lock); + Status result; + mutex_lockedFiles.Lock(); + // If we are unlocking, then verify that we had locked it earlier, + // it should already exist in lockedFiles. Remove it from lockedFiles. + if (lockedFiles.erase(my_lock->filename) != 1) { + errno = ENOLCK; + result = IOError("unlock", my_lock->filename, errno); + } else if (LockOrUnlock(my_lock->fd_, false) == -1) { + result = IOError("unlock", my_lock->filename, errno); + } + close(my_lock->fd_); + delete my_lock; + mutex_lockedFiles.Unlock(); + return result; + } + + void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW, + void* tag = nullptr, + void (*unschedFunction)(void* arg) = nullptr) override; + + int UnSchedule(void* arg, Priority pri) override; + + void StartThread(void (*function)(void* arg), void* arg) override; + + void WaitForJoin() override; + + unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override; + + Status GetTestDirectory(std::string* result) override { + const char* env = getenv("TEST_TMPDIR"); + if (env && env[0] != '\0') { + *result = env; + } else { + char buf[100]; + snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid())); + *result = buf; + } + // Directory may already exist + CreateDir(*result); + return Status::OK(); + } + + Status GetThreadList(std::vector* thread_list) override { + assert(thread_status_updater_); + return thread_status_updater_->GetThreadList(thread_list); + } + + static uint64_t gettid(pthread_t tid) { + uint64_t thread_id = 0; + memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); + return thread_id; + } + + static uint64_t gettid() { + pthread_t tid = pthread_self(); + return gettid(tid); + } + + uint64_t GetThreadID() const override { return gettid(pthread_self()); } + + Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override { + struct statvfs sbuf; + + if (statvfs(fname.c_str(), &sbuf) < 0) { + return IOError("While doing statvfs", fname, errno); + } + + *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree); + return Status::OK(); + } + + Status NewLogger(const std::string& fname, + std::shared_ptr* result) override { + FILE* f; + { + IOSTATS_TIMER_GUARD(open_nanos); + f = fopen(fname.c_str(), "w" +#ifdef __GLIBC_PREREQ +#if __GLIBC_PREREQ(2, 7) + "e" // glibc extension to enable O_CLOEXEC +#endif +#endif + ); + } + if (f == nullptr) { + result->reset(); + return IOError("when fopen a file for new logger", fname, errno); + } else { + int fd = fileno(f); +#ifdef ROCKSDB_FALLOCATE_PRESENT + fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024); +#endif + SetFD_CLOEXEC(fd, nullptr); + result->reset(new PosixLogger(f, &PosixEnv::gettid, this)); + return Status::OK(); + } + } + + uint64_t NowMicros() override { + struct timeval tv; + gettimeofday(&tv, nullptr); + return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; + } + + uint64_t NowNanos() override { +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#elif defined(OS_SOLARIS) + return gethrtime(); +#elif defined(__MACH__) + clock_serv_t cclock; + mach_timespec_t ts; + host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); + clock_get_time(cclock, &ts); + mach_port_deallocate(mach_task_self(), cclock); + return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#else + return std::chrono::duration_cast( + std::chrono::steady_clock::now().time_since_epoch()).count(); +#endif + } + + uint64_t NowCPUNanos() override { +#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \ + defined(__MACH__) + struct timespec ts; + clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts); + return static_cast(ts.tv_sec) * 1000000000 + ts.tv_nsec; +#endif + return 0; + } + + void SleepForMicroseconds(int micros) override { usleep(micros); } + + Status GetHostName(char* name, uint64_t len) override { + int ret = gethostname(name, static_cast(len)); + if (ret < 0) { + if (errno == EFAULT || errno == EINVAL) + return Status::InvalidArgument(strerror(errno)); + else + return IOError("GetHostName", name, errno); + } + return Status::OK(); + } + + Status GetCurrentTime(int64_t* unix_time) override { + time_t ret = time(nullptr); + if (ret == (time_t) -1) { + return IOError("GetCurrentTime", "", errno); + } + *unix_time = (int64_t) ret; + return Status::OK(); + } + + Status GetAbsolutePath(const std::string& db_path, + std::string* output_path) override { + if (!db_path.empty() && db_path[0] == '/') { + *output_path = db_path; + return Status::OK(); + } + + char the_path[256]; + char* ret = getcwd(the_path, 256); + if (ret == nullptr) { + return Status::IOError(strerror(errno)); + } + + *output_path = ret; + return Status::OK(); + } + + // Allow increasing the number of worker threads. + void SetBackgroundThreads(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].SetBackgroundThreads(num); + } + + int GetBackgroundThreads(Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetBackgroundThreads(); + } + + Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override { + allow_non_owner_access_ = allow_non_owner_access; + return Status::OK(); + } + + // Allow increasing the number of worker threads. + void IncBackgroundThreadsIfNeeded(int num, Priority pri) override { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].IncBackgroundThreadsIfNeeded(num); + } + + void LowerThreadPoolIOPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerIOPriority(); +#else + (void)pool; +#endif + } + + void LowerThreadPoolCPUPriority(Priority pool = LOW) override { + assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH); +#ifdef OS_LINUX + thread_pools_[pool].LowerCPUPriority(); +#else + (void)pool; +#endif + } + + std::string TimeToString(uint64_t secondsSince1970) override { + const time_t seconds = (time_t)secondsSince1970; + struct tm t; + int maxsize = 64; + std::string dummy; + dummy.reserve(maxsize); + dummy.resize(maxsize); + char* p = &dummy[0]; + localtime_r(&seconds, &t); + snprintf(p, maxsize, + "%04d/%02d/%02d-%02d:%02d:%02d ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec); + return dummy; + } + + EnvOptions OptimizeForLogWrite(const EnvOptions& env_options, + const DBOptions& db_options) const override { + EnvOptions optimized = env_options; + optimized.use_mmap_writes = false; + optimized.use_direct_writes = false; + optimized.bytes_per_sync = db_options.wal_bytes_per_sync; + // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it + // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit + // test and make this false + optimized.fallocate_with_keep_size = true; + optimized.writable_file_max_buffer_size = + db_options.writable_file_max_buffer_size; + return optimized; + } + + EnvOptions OptimizeForManifestWrite( + const EnvOptions& env_options) const override { + EnvOptions optimized = env_options; + optimized.use_mmap_writes = false; + optimized.use_direct_writes = false; + optimized.fallocate_with_keep_size = true; + return optimized; + } + + private: + bool checkedDiskForMmap_; + bool forceMmapOff_; // do we override Env options? + + // Returns true iff the named directory exists and is a directory. + virtual bool DirExists(const std::string& dname) { + struct stat statbuf; + if (stat(dname.c_str(), &statbuf) == 0) { + return S_ISDIR(statbuf.st_mode); + } + return false; // stat() failed return false + } + + bool SupportsFastAllocate(const std::string& path) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + struct statfs s; + if (statfs(path.c_str(), &s)){ + return false; + } + switch (s.f_type) { + case EXT4_SUPER_MAGIC: + return true; + case XFS_SUPER_MAGIC: + return true; + case TMPFS_MAGIC: + return true; + default: + return false; + } +#else + (void)path; + return false; +#endif + } + + size_t page_size_; + + std::vector thread_pools_; + pthread_mutex_t mu_; + std::vector threads_to_join_; + // If true, allow non owner read access for db files. Otherwise, non-owner + // has no access to db files. + bool allow_non_owner_access_; +}; + +PosixEnv::PosixEnv() + : checkedDiskForMmap_(false), + forceMmapOff_(false), + page_size_(getpagesize()), + thread_pools_(Priority::TOTAL), + allow_non_owner_access_(true) { + ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); + for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) { + thread_pools_[pool_id].SetThreadPriority( + static_cast(pool_id)); + // This allows later initializing the thread-local-env of each thread. + thread_pools_[pool_id].SetHostEnv(this); + } + thread_status_updater_ = CreateThreadStatusUpdater(); +} + +void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri, + void* tag, void (*unschedFunction)(void* arg)) { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + thread_pools_[pri].Schedule(function, arg, tag, unschedFunction); +} + +int PosixEnv::UnSchedule(void* arg, Priority pri) { + return thread_pools_[pri].UnSchedule(arg); +} + +unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const { + assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH); + return thread_pools_[pri].GetQueueLen(); +} + +struct StartThreadState { + void (*user_function)(void*); + void* arg; +}; + +static void* StartThreadWrapper(void* arg) { + StartThreadState* state = reinterpret_cast(arg); + state->user_function(state->arg); + delete state; + return nullptr; +} + +void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { + pthread_t t; + StartThreadState* state = new StartThreadState; + state->user_function = function; + state->arg = arg; + ThreadPoolImpl::PthreadCall( + "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state)); + ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_)); + threads_to_join_.push_back(t); + ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_)); +} + +void PosixEnv::WaitForJoin() { + for (const auto tid : threads_to_join_) { + pthread_join(tid, nullptr); + } + threads_to_join_.clear(); +} + +} // namespace + +std::string Env::GenerateUniqueId() { + std::string uuid_file = "/proc/sys/kernel/random/uuid"; + + Status s = FileExists(uuid_file); + if (s.ok()) { + std::string uuid; + s = ReadFileToString(this, uuid_file, &uuid); + if (s.ok()) { + return uuid; + } + } + // Could not read uuid_file - generate uuid using "nanos-random" + Random64 r(time(nullptr)); + uint64_t random_uuid_portion = + r.Uniform(std::numeric_limits::max()); + uint64_t nanos_uuid_portion = NowNanos(); + char uuid2[200]; + snprintf(uuid2, + 200, + "%lx-%lx", + (unsigned long)nanos_uuid_portion, + (unsigned long)random_uuid_portion); + return uuid2; +} + +// +// Default Posix Env +// +Env* Env::Default() { + // The following function call initializes the singletons of ThreadLocalPtr + // right before the static default_env. This guarantees default_env will + // always being destructed before the ThreadLocalPtr singletons get + // destructed as C++ guarantees that the destructions of static variables + // is in the reverse order of their constructions. + // + // Since static members are destructed in the reverse order + // of their construction, having this call here guarantees that + // the destructor of static PosixEnv will go first, then the + // the singletons of ThreadLocalPtr. + ThreadLocalPtr::InitSingletons(); + CompressionContextCache::InitSingleton(); + INIT_SYNC_POINT_SINGLETONS(); + static PosixEnv default_env; + return &default_env; +} + +} // namespace rocksdb diff --git a/src/rocksdb/env/env_test.cc b/src/rocksdb/env/env_test.cc new file mode 100644 index 00000000..47800928 --- /dev/null +++ b/src/rocksdb/env/env_test.cc @@ -0,0 +1,1774 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef OS_WIN +#include +#endif + +#ifdef ROCKSDB_MALLOC_USABLE_SIZE +#ifdef OS_FREEBSD +#include +#else +#include +#endif +#endif +#include + +#include +#include +#include +#include + +#ifdef OS_LINUX +#include +#include +#include +#include +#include +#endif + +#ifdef ROCKSDB_FALLOCATE_PRESENT +#include +#endif + +#include "env/env_chroot.h" +#include "port/port.h" +#include "rocksdb/env.h" +#include "util/coding.h" +#include "util/log_buffer.h" +#include "util/mutexlock.h" +#include "util/string_util.h" +#include "util/sync_point.h" +#include "util/testharness.h" +#include "util/testutil.h" + +#ifdef OS_LINUX +static const size_t kPageSize = sysconf(_SC_PAGESIZE); +#else +static const size_t kPageSize = 4 * 1024; +#endif + +namespace rocksdb { + +static const int kDelayMicros = 100000; + +struct Deleter { + explicit Deleter(void (*fn)(void*)) : fn_(fn) {} + + void operator()(void* ptr) { + assert(fn_); + assert(ptr); + (*fn_)(ptr); + } + + void (*fn_)(void*); +}; + +std::unique_ptr NewAligned(const size_t size, const char ch) { + char* ptr = nullptr; +#ifdef OS_WIN + if (nullptr == (ptr = reinterpret_cast(_aligned_malloc(size, kPageSize)))) { + return std::unique_ptr(nullptr, Deleter(_aligned_free)); + } + std::unique_ptr uptr(ptr, Deleter(_aligned_free)); +#else + if (posix_memalign(reinterpret_cast(&ptr), kPageSize, size) != 0) { + return std::unique_ptr(nullptr, Deleter(free)); + } + std::unique_ptr uptr(ptr, Deleter(free)); +#endif + memset(uptr.get(), ch, size); + return uptr; +} + +class EnvPosixTest : public testing::Test { + private: + port::Mutex mu_; + std::string events_; + + public: + Env* env_; + bool direct_io_; + EnvPosixTest() : env_(Env::Default()), direct_io_(false) {} +}; + +class EnvPosixTestWithParam + : public EnvPosixTest, + public ::testing::WithParamInterface> { + public: + EnvPosixTestWithParam() { + std::pair param_pair = GetParam(); + env_ = param_pair.first; + direct_io_ = param_pair.second; + } + + void WaitThreadPoolsEmpty() { + // Wait until the thread pools are empty. + while (env_->GetThreadPoolQueueLen(Env::Priority::LOW) != 0) { + Env::Default()->SleepForMicroseconds(kDelayMicros); + } + while (env_->GetThreadPoolQueueLen(Env::Priority::HIGH) != 0) { + Env::Default()->SleepForMicroseconds(kDelayMicros); + } + } + + ~EnvPosixTestWithParam() override { WaitThreadPoolsEmpty(); } +}; + +static void SetBool(void* ptr) { + reinterpret_cast*>(ptr)->store(true); +} + +TEST_F(EnvPosixTest, DISABLED_RunImmediately) { + for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) { + std::atomic called(false); + env_->SetBackgroundThreads(1, static_cast(pri)); + env_->Schedule(&SetBool, &called, static_cast(pri)); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(called.load()); + } +} + +TEST_F(EnvPosixTest, RunEventually) { + std::atomic called(false); + env_->StartThread(&SetBool, &called); + env_->WaitForJoin(); + ASSERT_TRUE(called.load()); +} + +#ifdef OS_WIN +TEST_F(EnvPosixTest, AreFilesSame) { + { + bool tmp; + if (env_->AreFilesSame("", "", &tmp).IsNotSupported()) { + fprintf(stderr, + "skipping EnvBasicTestWithParam.AreFilesSame due to " + "unsupported Env::AreFilesSame\n"); + return; + } + } + + const EnvOptions soptions; + auto* env = Env::Default(); + std::string same_file_name = test::PerThreadDBPath(env, "same_file"); + std::string same_file_link_name = same_file_name + "_link"; + + std::unique_ptr same_file; + ASSERT_OK(env->NewWritableFile(same_file_name, + &same_file, soptions)); + same_file->Append("random_data"); + ASSERT_OK(same_file->Flush()); + same_file.reset(); + + ASSERT_OK(env->LinkFile(same_file_name, same_file_link_name)); + bool result = false; + ASSERT_OK(env->AreFilesSame(same_file_name, same_file_link_name, &result)); + ASSERT_TRUE(result); +} +#endif + +#ifdef OS_LINUX +TEST_F(EnvPosixTest, DISABLED_FilePermission) { + // Only works for Linux environment + if (env_ == Env::Default()) { + EnvOptions soptions; + std::vector fileNames{ + test::PerThreadDBPath(env_, "testfile"), + test::PerThreadDBPath(env_, "testfile1")}; + std::unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fileNames[0], &wfile, soptions)); + ASSERT_OK(env_->NewWritableFile(fileNames[1], &wfile, soptions)); + wfile.reset(); + std::unique_ptr rwfile; + ASSERT_OK(env_->NewRandomRWFile(fileNames[1], &rwfile, soptions)); + + struct stat sb; + for (const auto& filename : fileNames) { + if (::stat(filename.c_str(), &sb) == 0) { + ASSERT_EQ(sb.st_mode & 0777, 0644); + } + env_->DeleteFile(filename); + } + + env_->SetAllowNonOwnerAccess(false); + ASSERT_OK(env_->NewWritableFile(fileNames[0], &wfile, soptions)); + ASSERT_OK(env_->NewWritableFile(fileNames[1], &wfile, soptions)); + wfile.reset(); + ASSERT_OK(env_->NewRandomRWFile(fileNames[1], &rwfile, soptions)); + + for (const auto& filename : fileNames) { + if (::stat(filename.c_str(), &sb) == 0) { + ASSERT_EQ(sb.st_mode & 0777, 0600); + } + env_->DeleteFile(filename); + } + } +} +#endif + +TEST_F(EnvPosixTest, MemoryMappedFileBuffer) { + const int kFileBytes = 1 << 15; // 32 KB + std::string expected_data; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + { + std::unique_ptr wfile; + const EnvOptions soptions; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + Random rnd(301); + test::RandomString(&rnd, kFileBytes, &expected_data); + ASSERT_OK(wfile->Append(expected_data)); + } + + std::unique_ptr mmap_buffer; + Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer); + // it should be supported at least on linux +#if !defined(OS_LINUX) + if (status.IsNotSupported()) { + fprintf(stderr, + "skipping EnvPosixTest.MemoryMappedFileBuffer due to " + "unsupported Env::NewMemoryMappedFileBuffer\n"); + return; + } +#endif // !defined(OS_LINUX) + + ASSERT_OK(status); + ASSERT_NE(nullptr, mmap_buffer.get()); + ASSERT_NE(nullptr, mmap_buffer->GetBase()); + ASSERT_EQ(kFileBytes, mmap_buffer->GetLen()); + std::string actual_data(reinterpret_cast(mmap_buffer->GetBase()), + mmap_buffer->GetLen()); + ASSERT_EQ(expected_data, actual_data); +} + +TEST_P(EnvPosixTestWithParam, UnSchedule) { + std::atomic called(false); + env_->SetBackgroundThreads(1, Env::LOW); + + /* Block the low priority queue */ + test::SleepingBackgroundTask sleeping_task, sleeping_task1; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task, + Env::Priority::LOW); + + /* Schedule another task */ + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task1, + Env::Priority::LOW, &sleeping_task1); + + /* Remove it with a different tag */ + ASSERT_EQ(0, env_->UnSchedule(&called, Env::Priority::LOW)); + + /* Remove it from the queue with the right tag */ + ASSERT_EQ(1, env_->UnSchedule(&sleeping_task1, Env::Priority::LOW)); + + // Unblock background thread + sleeping_task.WakeUp(); + + /* Schedule another task */ + env_->Schedule(&SetBool, &called); + for (int i = 0; i < kDelayMicros; i++) { + if (called.load()) { + break; + } + Env::Default()->SleepForMicroseconds(1); + } + ASSERT_TRUE(called.load()); + + ASSERT_TRUE(!sleeping_task.IsSleeping() && !sleeping_task1.IsSleeping()); + WaitThreadPoolsEmpty(); +} + +// This tests assumes that the last scheduled +// task will run last. In fact, in the allotted +// sleeping time nothing may actually run or they may +// run in any order. The purpose of the test is unclear. +#ifndef OS_WIN +TEST_P(EnvPosixTestWithParam, RunMany) { + std::atomic last_id(0); + + struct CB { + std::atomic* last_id_ptr; // Pointer to shared slot + int id; // Order# for the execution of this callback + + CB(std::atomic* p, int i) : last_id_ptr(p), id(i) {} + + static void Run(void* v) { + CB* cb = reinterpret_cast(v); + int cur = cb->last_id_ptr->load(); + ASSERT_EQ(cb->id - 1, cur); + cb->last_id_ptr->store(cb->id); + } + }; + + // Schedule in different order than start time + CB cb1(&last_id, 1); + CB cb2(&last_id, 2); + CB cb3(&last_id, 3); + CB cb4(&last_id, 4); + env_->Schedule(&CB::Run, &cb1); + env_->Schedule(&CB::Run, &cb2); + env_->Schedule(&CB::Run, &cb3); + env_->Schedule(&CB::Run, &cb4); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + int cur = last_id.load(std::memory_order_acquire); + ASSERT_EQ(4, cur); + WaitThreadPoolsEmpty(); +} +#endif + +struct State { + port::Mutex mu; + int val; + int num_running; +}; + +static void ThreadBody(void* arg) { + State* s = reinterpret_cast(arg); + s->mu.Lock(); + s->val += 1; + s->num_running -= 1; + s->mu.Unlock(); +} + +TEST_P(EnvPosixTestWithParam, StartThread) { + State state; + state.val = 0; + state.num_running = 3; + for (int i = 0; i < 3; i++) { + env_->StartThread(&ThreadBody, &state); + } + while (true) { + state.mu.Lock(); + int num = state.num_running; + state.mu.Unlock(); + if (num == 0) { + break; + } + Env::Default()->SleepForMicroseconds(kDelayMicros); + } + ASSERT_EQ(state.val, 3); + WaitThreadPoolsEmpty(); +} + +TEST_P(EnvPosixTestWithParam, TwoPools) { + // Data structures to signal tasks to run. + port::Mutex mutex; + port::CondVar cv(&mutex); + bool should_start = false; + + class CB { + public: + CB(const std::string& pool_name, int pool_size, port::Mutex* trigger_mu, + port::CondVar* trigger_cv, bool* _should_start) + : mu_(), + num_running_(0), + num_finished_(0), + pool_size_(pool_size), + pool_name_(pool_name), + trigger_mu_(trigger_mu), + trigger_cv_(trigger_cv), + should_start_(_should_start) {} + + static void Run(void* v) { + CB* cb = reinterpret_cast(v); + cb->Run(); + } + + void Run() { + { + MutexLock l(&mu_); + num_running_++; + // make sure we don't have more than pool_size_ jobs running. + ASSERT_LE(num_running_, pool_size_.load()); + } + + { + MutexLock l(trigger_mu_); + while (!(*should_start_)) { + trigger_cv_->Wait(); + } + } + + { + MutexLock l(&mu_); + num_running_--; + num_finished_++; + } + } + + int NumFinished() { + MutexLock l(&mu_); + return num_finished_; + } + + void Reset(int pool_size) { + pool_size_.store(pool_size); + num_finished_ = 0; + } + + private: + port::Mutex mu_; + int num_running_; + int num_finished_; + std::atomic pool_size_; + std::string pool_name_; + port::Mutex* trigger_mu_; + port::CondVar* trigger_cv_; + bool* should_start_; + }; + + const int kLowPoolSize = 2; + const int kHighPoolSize = 4; + const int kJobs = 8; + + CB low_pool_job("low", kLowPoolSize, &mutex, &cv, &should_start); + CB high_pool_job("high", kHighPoolSize, &mutex, &cv, &should_start); + + env_->SetBackgroundThreads(kLowPoolSize); + env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); + + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // schedule same number of jobs in each pool + for (int i = 0; i < kJobs; i++) { + env_->Schedule(&CB::Run, &low_pool_job); + env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); + } + // Wait a short while for the jobs to be dispatched. + int sleep_count = 0; + while ((unsigned int)(kJobs - kLowPoolSize) != + env_->GetThreadPoolQueueLen(Env::Priority::LOW) || + (unsigned int)(kJobs - kHighPoolSize) != + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) { + env_->SleepForMicroseconds(kDelayMicros); + if (++sleep_count > 100) { + break; + } + } + + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), + env_->GetThreadPoolQueueLen()); + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), + env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ((unsigned int)(kJobs - kHighPoolSize), + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // Trigger jobs to run. + { + MutexLock l(&mutex); + should_start = true; + cv.SignalAll(); + } + + // wait for all jobs to finish + while (low_pool_job.NumFinished() < kJobs || + high_pool_job.NumFinished() < kJobs) { + env_->SleepForMicroseconds(kDelayMicros); + } + + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // Hold jobs to schedule; + should_start = false; + + // call IncBackgroundThreadsIfNeeded to two pools. One increasing and + // the other decreasing + env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW); + env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH); + high_pool_job.Reset(kHighPoolSize + 1); + low_pool_job.Reset(kLowPoolSize); + + // schedule same number of jobs in each pool + for (int i = 0; i < kJobs; i++) { + env_->Schedule(&CB::Run, &low_pool_job); + env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH); + } + // Wait a short while for the jobs to be dispatched. + sleep_count = 0; + while ((unsigned int)(kJobs - kLowPoolSize) != + env_->GetThreadPoolQueueLen(Env::Priority::LOW) || + (unsigned int)(kJobs - (kHighPoolSize + 1)) != + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) { + env_->SleepForMicroseconds(kDelayMicros); + if (++sleep_count > 100) { + break; + } + } + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), + env_->GetThreadPoolQueueLen()); + ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize), + env_->GetThreadPoolQueueLen(Env::Priority::LOW)); + ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)), + env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + // Trigger jobs to run. + { + MutexLock l(&mutex); + should_start = true; + cv.SignalAll(); + } + + // wait for all jobs to finish + while (low_pool_job.NumFinished() < kJobs || + high_pool_job.NumFinished() < kJobs) { + env_->SleepForMicroseconds(kDelayMicros); + } + + env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH); + WaitThreadPoolsEmpty(); +} + +TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) { + std::vector tasks(10); + + // Set number of thread to 1 first. + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + + // Schedule 3 tasks. 0 running; Task 1, 2 waiting. + for (size_t i = 0; i < 3; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + } + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Increase to 2 threads. Task 0, 1 running; 2 waiting + env_->SetBackgroundThreads(2, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // The last task finishes. Task 0 running, 2 waiting. + tasks[1].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(!tasks[1].IsSleeping()); + ASSERT_TRUE(!tasks[2].IsSleeping()); + + // Increase to 5 threads. Task 0 and 2 running. + env_->SetBackgroundThreads(5, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[0].IsSleeping()); + ASSERT_TRUE(tasks[2].IsSleeping()); + + // Change number of threads a couple of times while there is no sufficient + // tasks. + env_->SetBackgroundThreads(7, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + tasks[2].WakeUp(); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(3, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(5, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + + Env::Default()->SleepForMicroseconds(kDelayMicros * 50); + + // Enqueue 5 more tasks. Thread pool size now is 4. + // Task 0, 3, 4, 5 running;6, 7 waiting. + for (size_t i = 3; i < 8; i++) { + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i], + Env::Priority::HIGH); + } + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[3].IsSleeping()); + ASSERT_TRUE(tasks[4].IsSleeping()); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(!tasks[6].IsSleeping()); + ASSERT_TRUE(!tasks[7].IsSleeping()); + + // Wake up task 0, 3 and 4. Task 5, 6, 7 running. + tasks[0].WakeUp(); + tasks[3].WakeUp(); + tasks[4].WakeUp(); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + for (size_t i = 5; i < 8; i++) { + ASSERT_TRUE(tasks[i].IsSleeping()); + } + + // Shrink back to 1 thread. Still task 5, 6, 7 running + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(tasks[6].IsSleeping()); + ASSERT_TRUE(tasks[7].IsSleeping()); + + // Wake up task 6. Task 5, 7 running + tasks[6].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(tasks[5].IsSleeping()); + ASSERT_TRUE(!tasks[6].IsSleeping()); + ASSERT_TRUE(tasks[7].IsSleeping()); + + // Wake up threads 7. Task 5 running + tasks[7].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[7].IsSleeping()); + + // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running. + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[8], + Env::Priority::HIGH); + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[9], + Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0); + ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping()); + + // Increase to 4 threads. Task 5, 8, 9 running. + env_->SetBackgroundThreads(4, Env::Priority::HIGH); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH)); + ASSERT_TRUE(tasks[8].IsSleeping()); + ASSERT_TRUE(tasks[9].IsSleeping()); + + // Shrink to 1 thread + env_->SetBackgroundThreads(1, Env::Priority::HIGH); + + // Wake up thread 9. + tasks[9].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[9].IsSleeping()); + ASSERT_TRUE(tasks[8].IsSleeping()); + + // Wake up thread 8 + tasks[8].WakeUp(); + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[8].IsSleeping()); + + // Wake up the last thread + tasks[5].WakeUp(); + + Env::Default()->SleepForMicroseconds(kDelayMicros); + ASSERT_TRUE(!tasks[5].IsSleeping()); + WaitThreadPoolsEmpty(); +} + +#if (defined OS_LINUX || defined OS_WIN) +// Travis doesn't support fallocate or getting unique ID from files for whatever +// reason. +#ifndef TRAVIS + +namespace { +bool IsSingleVarint(const std::string& s) { + Slice slice(s); + + uint64_t v; + if (!GetVarint64(&slice, &v)) { + return false; + } + + return slice.size() == 0; +} + +bool IsUniqueIDValid(const std::string& s) { + return !s.empty() && !IsSingleVarint(s); +} + +const size_t MAX_ID_SIZE = 100; +char temp_id[MAX_ID_SIZE]; + + +} // namespace + +// Determine whether we can use the FS_IOC_GETVERSION ioctl +// on a file in directory DIR. Create a temporary file therein, +// try to apply the ioctl (save that result), cleanup and +// return the result. Return true if it is supported, and +// false if anything fails. +// Note that this function "knows" that dir has just been created +// and is empty, so we create a simply-named test file: "f". +bool ioctl_support__FS_IOC_GETVERSION(const std::string& dir) { +#ifdef OS_WIN + return true; +#else + const std::string file = dir + "/f"; + int fd; + do { + fd = open(file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + } while (fd < 0 && errno == EINTR); + long int version; + bool ok = (fd >= 0 && ioctl(fd, FS_IOC_GETVERSION, &version) >= 0); + + close(fd); + unlink(file.c_str()); + + return ok; +#endif +} + +// To ensure that Env::GetUniqueId-related tests work correctly, the files +// should be stored in regular storage like "hard disk" or "flash device", +// and not on a tmpfs file system (like /dev/shm and /tmp on some systems). +// Otherwise we cannot get the correct id. +// +// This function serves as the replacement for test::TmpDir(), which may be +// customized to be on a file system that doesn't work with GetUniqueId(). + +class IoctlFriendlyTmpdir { + public: + explicit IoctlFriendlyTmpdir() { + char dir_buf[100]; + + const char *fmt = "%s/rocksdb.XXXXXX"; + const char *tmp = getenv("TEST_IOCTL_FRIENDLY_TMPDIR"); + +#ifdef OS_WIN +#define rmdir _rmdir + if(tmp == nullptr) { + tmp = getenv("TMP"); + } + + snprintf(dir_buf, sizeof dir_buf, fmt, tmp); + auto result = _mktemp(dir_buf); + assert(result != nullptr); + BOOL ret = CreateDirectory(dir_buf, NULL); + assert(ret == TRUE); + dir_ = dir_buf; +#else + std::list candidate_dir_list = {"/var/tmp", "/tmp"}; + + // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use + // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and + // add 1 for the trailing NUL byte. + if (tmp && strlen(tmp) + strlen(fmt) - 2 + 1 <= sizeof dir_buf) { + // use $TEST_IOCTL_FRIENDLY_TMPDIR value + candidate_dir_list.push_front(tmp); + } + + for (const std::string& d : candidate_dir_list) { + snprintf(dir_buf, sizeof dir_buf, fmt, d.c_str()); + if (mkdtemp(dir_buf)) { + if (ioctl_support__FS_IOC_GETVERSION(dir_buf)) { + dir_ = dir_buf; + return; + } else { + // Diagnose ioctl-related failure only if this is the + // directory specified via that envvar. + if (tmp && tmp == d) { + fprintf(stderr, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is " + "not suitable: %s\n", d.c_str()); + } + rmdir(dir_buf); // ignore failure + } + } else { + // mkdtemp failed: diagnose it, but don't give up. + fprintf(stderr, "mkdtemp(%s/...) failed: %s\n", d.c_str(), + strerror(errno)); + } + } + + fprintf(stderr, "failed to find an ioctl-friendly temporary directory;" + " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n"); + std::abort(); +#endif +} + + ~IoctlFriendlyTmpdir() { + rmdir(dir_.c_str()); + } + + const std::string& name() const { + return dir_; + } + + private: + std::string dir_; +}; + +#ifndef ROCKSDB_LITE +TEST_F(EnvPosixTest, PositionedAppend) { + std::unique_ptr writable_file; + EnvOptions options; + options.use_direct_writes = true; + options.use_mmap_writes = false; + IoctlFriendlyTmpdir ift; + ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options)); + const size_t kBlockSize = 4096; + const size_t kDataSize = kPageSize; + // Write a page worth of 'a' + auto data_ptr = NewAligned(kDataSize, 'a'); + Slice data_a(data_ptr.get(), kDataSize); + ASSERT_OK(writable_file->PositionedAppend(data_a, 0U)); + // Write a page worth of 'b' right after the first sector + data_ptr = NewAligned(kDataSize, 'b'); + Slice data_b(data_ptr.get(), kDataSize); + ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize)); + ASSERT_OK(writable_file->Close()); + // The file now has 1 sector worth of a followed by a page worth of b + + // Verify the above + std::unique_ptr seq_file; + ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options)); + char scratch[kPageSize * 2]; + Slice result; + ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch)); + ASSERT_EQ(kPageSize + kBlockSize, result.size()); + ASSERT_EQ('a', result[kBlockSize - 1]); + ASSERT_EQ('b', result[kBlockSize]); +} +#endif // !ROCKSDB_LITE + +// Only works in linux platforms +TEST_P(EnvPosixTestWithParam, RandomAccessUniqueID) { + // Create file. + if (env_ == Env::Default()) { + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + IoctlFriendlyTmpdir ift; + std::string fname = ift.name() + "/testfile"; + std::unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + std::unique_ptr file; + + // Get Unique ID + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + std::string unique_id1(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id1)); + + // Get Unique ID again + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + std::string unique_id2(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id2)); + + // Get Unique ID again after waiting some time. + env_->SleepForMicroseconds(1000000); + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + std::string unique_id3(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id3)); + + // Check IDs are the same. + ASSERT_EQ(unique_id1, unique_id2); + ASSERT_EQ(unique_id2, unique_id3); + + // Delete the file + env_->DeleteFile(fname); + } +} + +// only works in linux platforms +#ifdef ROCKSDB_FALLOCATE_PRESENT +TEST_P(EnvPosixTestWithParam, AllocateTest) { + if (env_ == Env::Default()) { + IoctlFriendlyTmpdir ift; + std::string fname = ift.name() + "/preallocate_testfile"; + + // Try fallocate in a file to see whether the target file system supports + // it. + // Skip the test if fallocate is not supported. + std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2"; + int fd = -1; + do { + fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644); + } while (fd < 0 && errno == EINTR); + ASSERT_GT(fd, 0); + + int alloc_status = fallocate(fd, 0, 0, 1); + + int err_number = 0; + if (alloc_status != 0) { + err_number = errno; + fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number)); + } + close(fd); + ASSERT_OK(env_->DeleteFile(fname_test_fallocate)); + if (alloc_status != 0 && err_number == EOPNOTSUPP) { + // The filesystem containing the file does not support fallocate + return; + } + + EnvOptions soptions; + soptions.use_mmap_writes = false; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + std::unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + + // allocate 100 MB + size_t kPreallocateSize = 100 * 1024 * 1024; + size_t kBlockSize = 512; + size_t kPageSize = 4096; + size_t kDataSize = 1024 * 1024; + auto data_ptr = NewAligned(kDataSize, 'A'); + Slice data(data_ptr.get(), kDataSize); + wfile->SetPreallocationBlockSize(kPreallocateSize); + wfile->PrepareWrite(wfile->GetFileSize(), kDataSize); + ASSERT_OK(wfile->Append(data)); + ASSERT_OK(wfile->Flush()); + + struct stat f_stat; + ASSERT_EQ(stat(fname.c_str(), &f_stat), 0); + ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size); + // verify that blocks are preallocated + // Note here that we don't check the exact number of blocks preallocated -- + // we only require that number of allocated blocks is at least what we + // expect. + // It looks like some FS give us more blocks that we asked for. That's fine. + // It might be worth investigating further. + ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks); + + // close the file, should deallocate the blocks + wfile.reset(); + + stat(fname.c_str(), &f_stat); + ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size); + // verify that preallocated blocks were deallocated on file close + // Because the FS might give us more blocks, we add a full page to the size + // and expect the number of blocks to be less or equal to that. + ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize, + (unsigned int)f_stat.st_blocks); + } +} +#endif // ROCKSDB_FALLOCATE_PRESENT + +// Returns true if any of the strings in ss are the prefix of another string. +bool HasPrefix(const std::unordered_set& ss) { + for (const std::string& s: ss) { + if (s.empty()) { + return true; + } + for (size_t i = 1; i < s.size(); ++i) { + if (ss.count(s.substr(0, i)) != 0) { + return true; + } + } + } + return false; +} + +// Only works in linux and WIN platforms +TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDConcurrent) { + if (env_ == Env::Default()) { + // Check whether a bunch of concurrently existing files have unique IDs. + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + + // Create the files + IoctlFriendlyTmpdir ift; + std::vector fnames; + for (int i = 0; i < 1000; ++i) { + fnames.push_back(ift.name() + "/" + "testfile" + ToString(i)); + + // Create file. + std::unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions)); + } + + // Collect and check whether the IDs are unique. + std::unordered_set ids; + for (const std::string fname : fnames) { + std::unique_ptr file; + std::string unique_id; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + unique_id = std::string(temp_id, id_size); + ASSERT_TRUE(IsUniqueIDValid(unique_id)); + + ASSERT_TRUE(ids.count(unique_id) == 0); + ids.insert(unique_id); + } + + // Delete the files + for (const std::string fname : fnames) { + ASSERT_OK(env_->DeleteFile(fname)); + } + + ASSERT_TRUE(!HasPrefix(ids)); + } +} + +// Only works in linux and WIN platforms +TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) { + if (env_ == Env::Default()) { + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + + IoctlFriendlyTmpdir ift; + std::string fname = ift.name() + "/" + "testfile"; + + // Check that after file is deleted we don't get same ID again in a new + // file. + std::unordered_set ids; + for (int i = 0; i < 1000; ++i) { + // Create file. + { + std::unique_ptr wfile; + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + } + + // Get Unique ID + std::string unique_id; + { + std::unique_ptr file; + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE); + ASSERT_TRUE(id_size > 0); + unique_id = std::string(temp_id, id_size); + } + + ASSERT_TRUE(IsUniqueIDValid(unique_id)); + ASSERT_TRUE(ids.count(unique_id) == 0); + ids.insert(unique_id); + + // Delete the file + ASSERT_OK(env_->DeleteFile(fname)); + } + + ASSERT_TRUE(!HasPrefix(ids)); + } +} + +// Only works in linux platforms +#ifdef OS_WIN +TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) { +#else +TEST_P(EnvPosixTestWithParam, InvalidateCache) { +#endif + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + std::string fname = test::PerThreadDBPath(env_, "testfile"); + + const size_t kSectorSize = 512; + auto data = NewAligned(kSectorSize, 0); + Slice slice(data.get(), kSectorSize); + + // Create file. + { + std::unique_ptr wfile; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) + if (soptions.use_direct_writes) { + soptions.use_direct_writes = false; + } +#endif + ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions)); + ASSERT_OK(wfile->Append(slice)); + ASSERT_OK(wfile->InvalidateCache(0, 0)); + ASSERT_OK(wfile->Close()); + } + + // Random Read + { + std::unique_ptr file; + auto scratch = NewAligned(kSectorSize, 0); + Slice result; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) + if (soptions.use_direct_reads) { + soptions.use_direct_reads = false; + } +#endif + ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions)); + ASSERT_OK(file->Read(0, kSectorSize, &result, scratch.get())); + ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0); + ASSERT_OK(file->InvalidateCache(0, 11)); + ASSERT_OK(file->InvalidateCache(0, 0)); + } + + // Sequential Read + { + std::unique_ptr file; + auto scratch = NewAligned(kSectorSize, 0); + Slice result; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) + if (soptions.use_direct_reads) { + soptions.use_direct_reads = false; + } +#endif + ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions)); + if (file->use_direct_io()) { + ASSERT_OK(file->PositionedRead(0, kSectorSize, &result, scratch.get())); + } else { + ASSERT_OK(file->Read(kSectorSize, &result, scratch.get())); + } + ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0); + ASSERT_OK(file->InvalidateCache(0, 11)); + ASSERT_OK(file->InvalidateCache(0, 0)); + } + // Delete the file + ASSERT_OK(env_->DeleteFile(fname)); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); +} +#endif // not TRAVIS +#endif // OS_LINUX || OS_WIN + +class TestLogger : public Logger { + public: + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + log_count++; + + char new_format[550]; + std::fill_n(new_format, sizeof(new_format), '2'); + { + va_list backup_ap; + va_copy(backup_ap, ap); + int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap); + // 48 bytes for extra information + bytes allocated + +// When we have n == -1 there is not a terminating zero expected +#ifdef OS_WIN + if (n < 0) { + char_0_count++; + } +#endif + + if (new_format[0] == '[') { + // "[DEBUG] " + ASSERT_TRUE(n <= 56 + (512 - static_cast(sizeof(struct timeval)))); + } else { + ASSERT_TRUE(n <= 48 + (512 - static_cast(sizeof(struct timeval)))); + } + va_end(backup_ap); + } + + for (size_t i = 0; i < sizeof(new_format); i++) { + if (new_format[i] == 'x') { + char_x_count++; + } else if (new_format[i] == '\0') { + char_0_count++; + } + } + } + int log_count; + int char_x_count; + int char_0_count; +}; + +TEST_P(EnvPosixTestWithParam, LogBufferTest) { + TestLogger test_logger; + test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + test_logger.log_count = 0; + test_logger.char_x_count = 0; + test_logger.char_0_count = 0; + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger); + LogBuffer log_buffer_debug(DEBUG_LEVEL, &test_logger); + + char bytes200[200]; + std::fill_n(bytes200, sizeof(bytes200), '1'); + bytes200[sizeof(bytes200) - 1] = '\0'; + char bytes600[600]; + std::fill_n(bytes600, sizeof(bytes600), '1'); + bytes600[sizeof(bytes600) - 1] = '\0'; + char bytes9000[9000]; + std::fill_n(bytes9000, sizeof(bytes9000), '1'); + bytes9000[sizeof(bytes9000) - 1] = '\0'; + + ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes200); + ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes600); + ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx%sx", bytes200, bytes200, bytes200); + ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes200, bytes600); + ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes600, bytes9000); + + ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx", bytes200); + test_logger.SetInfoLogLevel(DEBUG_LEVEL); + ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx%sx%sx", bytes600, bytes9000, + bytes200); + + ASSERT_EQ(0, test_logger.log_count); + log_buffer.FlushBufferToLog(); + log_buffer_debug.FlushBufferToLog(); + ASSERT_EQ(6, test_logger.log_count); + ASSERT_EQ(6, test_logger.char_0_count); + ASSERT_EQ(10, test_logger.char_x_count); +} + +class TestLogger2 : public Logger { + public: + explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {} + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + char new_format[2000]; + std::fill_n(new_format, sizeof(new_format), '2'); + { + va_list backup_ap; + va_copy(backup_ap, ap); + int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap); + // 48 bytes for extra information + bytes allocated + ASSERT_TRUE( + n <= 48 + static_cast(max_log_size_ - sizeof(struct timeval))); + ASSERT_TRUE(n > static_cast(max_log_size_ - sizeof(struct timeval))); + va_end(backup_ap); + } + } + size_t max_log_size_; +}; + +TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) { + char bytes9000[9000]; + std::fill_n(bytes9000, sizeof(bytes9000), '1'); + bytes9000[sizeof(bytes9000) - 1] = '\0'; + + for (size_t max_log_size = 256; max_log_size <= 1024; + max_log_size += 1024 - 256) { + TestLogger2 test_logger(max_log_size); + test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger); + ROCKS_LOG_BUFFER_MAX_SZ(&log_buffer, max_log_size, "%s", bytes9000); + log_buffer.FlushBufferToLog(); + } +} + +TEST_P(EnvPosixTestWithParam, Preallocation) { + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + const std::string src = test::PerThreadDBPath(env_, "testfile"); + std::unique_ptr srcfile; + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD) + if (soptions.use_direct_writes) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } +#endif + ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions)); + srcfile->SetPreallocationBlockSize(1024 * 1024); + + // No writes should mean no preallocation + size_t block_size, last_allocated_block; + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 0UL); + + // Small write should preallocate one block + size_t kStrSize = 4096; + auto data = NewAligned(kStrSize, 'A'); + Slice str(data.get(), kStrSize); + srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize); + srcfile->Append(str); + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 1UL); + + // Write an entire preallocation block, make sure we increased by two. + { + auto buf_ptr = NewAligned(block_size, ' '); + Slice buf(buf_ptr.get(), block_size); + srcfile->PrepareWrite(srcfile->GetFileSize(), block_size); + srcfile->Append(buf); + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 2UL); + } + + // Write five more blocks at once, ensure we're where we need to be. + { + auto buf_ptr = NewAligned(block_size * 5, ' '); + Slice buf = Slice(buf_ptr.get(), block_size * 5); + srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size()); + srcfile->Append(buf); + srcfile->GetPreallocationStatus(&block_size, &last_allocated_block); + ASSERT_EQ(last_allocated_block, 7UL); + } + rocksdb::SyncPoint::GetInstance()->ClearTrace(); +} + +// Test that the two ways to get children file attributes (in bulk or +// individually) behave consistently. +TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + EnvOptions soptions; + soptions.use_direct_reads = soptions.use_direct_writes = direct_io_; + const int kNumChildren = 10; + + std::string data; + for (int i = 0; i < kNumChildren; ++i) { + const std::string path = + test::TmpDir(env_) + "/" + "testfile_" + std::to_string(i); + std::unique_ptr file; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD) + if (soptions.use_direct_writes) { + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + } +#endif + ASSERT_OK(env_->NewWritableFile(path, &file, soptions)); + auto buf_ptr = NewAligned(data.size(), 'T'); + Slice buf(buf_ptr.get(), data.size()); + file->Append(buf); + data.append(std::string(4096, 'T')); + } + + std::vector file_attrs; + ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs)); + for (int i = 0; i < kNumChildren; ++i) { + const std::string name = "testfile_" + std::to_string(i); + const std::string path = test::TmpDir(env_) + "/" + name; + + auto file_attrs_iter = std::find_if( + file_attrs.begin(), file_attrs.end(), + [&name](const Env::FileAttributes& fm) { return fm.name == name; }); + ASSERT_TRUE(file_attrs_iter != file_attrs.end()); + uint64_t size; + ASSERT_OK(env_->GetFileSize(path, &size)); + ASSERT_EQ(size, 4096 * i); + ASSERT_EQ(size, file_attrs_iter->size_bytes); + } + rocksdb::SyncPoint::GetInstance()->ClearTrace(); +} + +// Test that all WritableFileWrapper forwards all calls to WritableFile. +TEST_P(EnvPosixTestWithParam, WritableFileWrapper) { + class Base : public WritableFile { + public: + mutable int *step_; + + void inc(int x) const { + EXPECT_EQ(x, (*step_)++); + } + + explicit Base(int* step) : step_(step) { + inc(0); + } + + Status Append(const Slice& /*data*/) override { + inc(1); + return Status::OK(); + } + + Status PositionedAppend(const Slice& /*data*/, + uint64_t /*offset*/) override { + inc(2); + return Status::OK(); + } + + Status Truncate(uint64_t /*size*/) override { + inc(3); + return Status::OK(); + } + + Status Close() override { + inc(4); + return Status::OK(); + } + + Status Flush() override { + inc(5); + return Status::OK(); + } + + Status Sync() override { + inc(6); + return Status::OK(); + } + + Status Fsync() override { + inc(7); + return Status::OK(); + } + + bool IsSyncThreadSafe() const override { + inc(8); + return true; + } + + bool use_direct_io() const override { + inc(9); + return true; + } + + size_t GetRequiredBufferAlignment() const override { + inc(10); + return 0; + } + + void SetIOPriority(Env::IOPriority /*pri*/) override { inc(11); } + + Env::IOPriority GetIOPriority() override { + inc(12); + return Env::IOPriority::IO_LOW; + } + + void SetWriteLifeTimeHint(Env::WriteLifeTimeHint /*hint*/) override { + inc(13); + } + + Env::WriteLifeTimeHint GetWriteLifeTimeHint() override { + inc(14); + return Env::WriteLifeTimeHint::WLTH_NOT_SET; + } + + uint64_t GetFileSize() override { + inc(15); + return 0; + } + + void SetPreallocationBlockSize(size_t /*size*/) override { inc(16); } + + void GetPreallocationStatus(size_t* /*block_size*/, + size_t* /*last_allocated_block*/) override { + inc(17); + } + + size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override { + inc(18); + return 0; + } + + Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override { + inc(19); + return Status::OK(); + } + + Status RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/) override { + inc(20); + return Status::OK(); + } + + void PrepareWrite(size_t /*offset*/, size_t /*len*/) override { inc(21); } + + Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override { + inc(22); + return Status::OK(); + } + + public: + ~Base() override { inc(23); } + }; + + class Wrapper : public WritableFileWrapper { + public: + explicit Wrapper(WritableFile* target) : WritableFileWrapper(target) {} + }; + + int step = 0; + + { + Base b(&step); + Wrapper w(&b); + w.Append(Slice()); + w.PositionedAppend(Slice(), 0); + w.Truncate(0); + w.Close(); + w.Flush(); + w.Sync(); + w.Fsync(); + w.IsSyncThreadSafe(); + w.use_direct_io(); + w.GetRequiredBufferAlignment(); + w.SetIOPriority(Env::IOPriority::IO_HIGH); + w.GetIOPriority(); + w.SetWriteLifeTimeHint(Env::WriteLifeTimeHint::WLTH_NOT_SET); + w.GetWriteLifeTimeHint(); + w.GetFileSize(); + w.SetPreallocationBlockSize(0); + w.GetPreallocationStatus(nullptr, nullptr); + w.GetUniqueId(nullptr, 0); + w.InvalidateCache(0, 0); + w.RangeSync(0, 0); + w.PrepareWrite(0, 0); + w.Allocate(0, 0); + } + + EXPECT_EQ(24, step); +} + +TEST_P(EnvPosixTestWithParam, PosixRandomRWFile) { + const std::string path = test::PerThreadDBPath(env_, "random_rw_file"); + + env_->DeleteFile(path); + + std::unique_ptr file; + + // Cannot open non-existing file. + ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions())); + + // Create the file using WriteableFile + { + std::unique_ptr wf; + ASSERT_OK(env_->NewWritableFile(path, &wf, EnvOptions())); + } + + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + + char buf[10000]; + Slice read_res; + + ASSERT_OK(file->Write(0, "ABCD")); + ASSERT_OK(file->Read(0, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABCD"); + + ASSERT_OK(file->Write(2, "XXXX")); + ASSERT_OK(file->Read(0, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXXX"); + + ASSERT_OK(file->Write(10, "ZZZ")); + ASSERT_OK(file->Read(10, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ZZZ"); + + ASSERT_OK(file->Write(11, "Y")); + ASSERT_OK(file->Read(10, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ZYZ"); + + ASSERT_OK(file->Write(200, "FFFFF")); + ASSERT_OK(file->Read(200, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "FFFFF"); + + ASSERT_OK(file->Write(205, "XXXX")); + ASSERT_OK(file->Read(200, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "FFFFFXXXX"); + + ASSERT_OK(file->Write(5, "QQQQ")); + ASSERT_OK(file->Read(0, 9, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ"); + + ASSERT_OK(file->Read(2, 4, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "XXXQ"); + + // Close file and reopen it + file->Close(); + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + + ASSERT_OK(file->Read(0, 9, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ"); + + ASSERT_OK(file->Read(10, 3, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ZYZ"); + + ASSERT_OK(file->Read(200, 9, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "FFFFFXXXX"); + + ASSERT_OK(file->Write(4, "TTTTTTTTTTTTTTTT")); + ASSERT_OK(file->Read(0, 10, &read_res, buf)); + ASSERT_EQ(read_res.ToString(), "ABXXTTTTTT"); + + // Clean up + env_->DeleteFile(path); +} + +class RandomRWFileWithMirrorString { + public: + explicit RandomRWFileWithMirrorString(RandomRWFile* _file) : file_(_file) {} + + void Write(size_t offset, const std::string& data) { + // Write to mirror string + StringWrite(offset, data); + + // Write to file + Status s = file_->Write(offset, data); + ASSERT_OK(s) << s.ToString(); + } + + void Read(size_t offset = 0, size_t n = 1000000) { + Slice str_res(nullptr, 0); + if (offset < file_mirror_.size()) { + size_t str_res_sz = std::min(file_mirror_.size() - offset, n); + str_res = Slice(file_mirror_.data() + offset, str_res_sz); + StopSliceAtNull(&str_res); + } + + Slice file_res; + Status s = file_->Read(offset, n, &file_res, buf_); + ASSERT_OK(s) << s.ToString(); + StopSliceAtNull(&file_res); + + ASSERT_EQ(str_res.ToString(), file_res.ToString()) << offset << " " << n; + } + + void SetFile(RandomRWFile* _file) { file_ = _file; } + + private: + void StringWrite(size_t offset, const std::string& src) { + if (offset + src.size() > file_mirror_.size()) { + file_mirror_.resize(offset + src.size(), '\0'); + } + + char* pos = const_cast(file_mirror_.data() + offset); + memcpy(pos, src.data(), src.size()); + } + + void StopSliceAtNull(Slice* slc) { + for (size_t i = 0; i < slc->size(); i++) { + if ((*slc)[i] == '\0') { + *slc = Slice(slc->data(), i); + break; + } + } + } + + char buf_[10000]; + RandomRWFile* file_; + std::string file_mirror_; +}; + +TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) { + const std::string path = test::PerThreadDBPath(env_, "random_rw_file_rand"); + env_->DeleteFile(path); + + std::unique_ptr file; + +#ifdef OS_LINUX + // Cannot open non-existing file. + ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions())); +#endif + + // Create the file using WriteableFile + { + std::unique_ptr wf; + ASSERT_OK(env_->NewWritableFile(path, &wf, EnvOptions())); + } + + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + RandomRWFileWithMirrorString file_with_mirror(file.get()); + + Random rnd(301); + std::string buf; + for (int i = 0; i < 10000; i++) { + // Genrate random data + test::RandomString(&rnd, 10, &buf); + + // Pick random offset for write + size_t write_off = rnd.Next() % 1000; + file_with_mirror.Write(write_off, buf); + + // Pick random offset for read + size_t read_off = rnd.Next() % 1000; + size_t read_sz = rnd.Next() % 20; + file_with_mirror.Read(read_off, read_sz); + + if (i % 500 == 0) { + // Reopen the file every 500 iters + ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions())); + file_with_mirror.SetFile(file.get()); + } + } + + // clean up + env_->DeleteFile(path); +} + +class TestEnv : public EnvWrapper { + public: + explicit TestEnv() : EnvWrapper(Env::Default()), + close_count(0) { } + + class TestLogger : public Logger { + public: + using Logger::Logv; + TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; } + ~TestLogger() override { + if (!closed_) { + CloseHelper(); + } + } + void Logv(const char* /*format*/, va_list /*ap*/) override{}; + + protected: + Status CloseImpl() override { return CloseHelper(); } + + private: + Status CloseHelper() { + env->CloseCountInc();; + return Status::OK(); + } + TestEnv* env; + }; + + void CloseCountInc() { close_count++; } + + int GetCloseCount() { return close_count; } + + Status NewLogger(const std::string& /*fname*/, + std::shared_ptr* result) override { + result->reset(new TestLogger(this)); + return Status::OK(); + } + + private: + int close_count; +}; + +class EnvTest : public testing::Test {}; + +TEST_F(EnvTest, Close) { + TestEnv* env = new TestEnv(); + std::shared_ptr logger; + Status s; + + s = env->NewLogger("", &logger); + ASSERT_EQ(s, Status::OK()); + logger.get()->Close(); + ASSERT_EQ(env->GetCloseCount(), 1); + // Call Close() again. CloseHelper() should not be called again + logger.get()->Close(); + ASSERT_EQ(env->GetCloseCount(), 1); + logger.reset(); + ASSERT_EQ(env->GetCloseCount(), 1); + + s = env->NewLogger("", &logger); + ASSERT_EQ(s, Status::OK()); + logger.reset(); + ASSERT_EQ(env->GetCloseCount(), 2); + + delete env; +} + +INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(Env::Default(), + false))); +#if !defined(ROCKSDB_LITE) +INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(Env::Default(), + true))); +#endif // !defined(ROCKSDB_LITE) + +#if !defined(ROCKSDB_LITE) && !defined(OS_WIN) +static std::unique_ptr chroot_env( + NewChrootEnv(Env::Default(), test::TmpDir(Env::Default()))); +INSTANTIATE_TEST_CASE_P( + ChrootEnvWithoutDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(chroot_env.get(), false))); +INSTANTIATE_TEST_CASE_P( + ChrootEnvWithDirectIO, EnvPosixTestWithParam, + ::testing::Values(std::pair(chroot_env.get(), true))); +#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN) + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/env/io_posix.cc b/src/rocksdb/env/io_posix.cc new file mode 100644 index 00000000..628ed841 --- /dev/null +++ b/src/rocksdb/env/io_posix.cc @@ -0,0 +1,1082 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifdef ROCKSDB_LIB_IO_POSIX +#include "env/io_posix.h" +#include +#include +#include +#if defined(OS_LINUX) +#include +#endif +#include +#include +#include +#include +#include +#include +#include +#ifdef OS_LINUX +#include +#include +#include +#endif +#include "env/posix_logger.h" +#include "monitoring/iostats_context_imp.h" +#include "port/port.h" +#include "rocksdb/slice.h" +#include "util/coding.h" +#include "util/string_util.h" +#include "util/sync_point.h" + +#if defined(OS_LINUX) && !defined(F_SET_RW_HINT) +#define F_LINUX_SPECIFIC_BASE 1024 +#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12) +#endif + +namespace rocksdb { + +// A wrapper for fadvise, if the platform doesn't support fadvise, +// it will simply return 0. +int Fadvise(int fd, off_t offset, size_t len, int advice) { +#ifdef OS_LINUX + return posix_fadvise(fd, offset, len, advice); +#else + (void)fd; + (void)offset; + (void)len; + (void)advice; + return 0; // simply do nothing. +#endif +} + +namespace { +size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) { +#ifdef OS_LINUX + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return kDefaultPageSize; + } + if (major(buf.st_dev) == 0) { + // Unnamed devices (e.g. non-device mounts), reserved as null device number. + // These don't have an entry in /sys/dev/block/. Return a sensible default. + return kDefaultPageSize; + } + + // Reading queue/logical_block_size does not require special permissions. + const int kBufferSize = 100; + char path[kBufferSize]; + char real_path[PATH_MAX + 1]; + snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev), + minor(buf.st_dev)); + if (realpath(path, real_path) == nullptr) { + return kDefaultPageSize; + } + std::string device_dir(real_path); + if (!device_dir.empty() && device_dir.back() == '/') { + device_dir.pop_back(); + } + // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda + // and nvme0n1 have it. + // $ ls -al '/sys/dev/block/8:3' + // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 -> + // ../../block/sda/sda3 + // $ ls -al '/sys/dev/block/259:4' + // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 -> + // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1 + size_t parent_end = device_dir.rfind('/', device_dir.length() - 1); + if (parent_end == std::string::npos) { + return kDefaultPageSize; + } + size_t parent_begin = device_dir.rfind('/', parent_end - 1); + if (parent_begin == std::string::npos) { + return kDefaultPageSize; + } + std::string parent = + device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1); + std::string child = device_dir.substr(parent_end + 1, std::string::npos); + if (parent != "block" && + (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) { + device_dir = device_dir.substr(0, parent_end); + } + std::string fname = device_dir + "/queue/logical_block_size"; + FILE* fp; + size_t size = 0; + fp = fopen(fname.c_str(), "r"); + if (fp != nullptr) { + char* line = nullptr; + size_t len = 0; + if (getline(&line, &len, fp) != -1) { + sscanf(line, "%zu", &size); + } + free(line); + fclose(fp); + } + if (size != 0 && (size & (size - 1)) == 0) { + return size; + } +#endif + return kDefaultPageSize; +} +} // namespace + +/* + * DirectIOHelper + */ +#ifndef NDEBUG +namespace { + +bool IsSectorAligned(const size_t off, size_t sector_size) { + return off % sector_size == 0; +} + +bool IsSectorAligned(const void* ptr, size_t sector_size) { + return uintptr_t(ptr) % sector_size == 0; +} + +} +#endif + +/* + * PosixSequentialFile + */ +PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file, + int fd, const EnvOptions& options) + : filename_(fname), + file_(file), + fd_(fd), + use_direct_io_(options.use_direct_reads), + logical_sector_size_(GetLogicalBufferSize(fd_)) { + assert(!options.use_direct_reads || !options.use_mmap_reads); +} + +PosixSequentialFile::~PosixSequentialFile() { + if (!use_direct_io()) { + assert(file_); + fclose(file_); + } else { + assert(fd_); + close(fd_); + } +} + +Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) { + assert(result != nullptr && !use_direct_io()); + Status s; + size_t r = 0; + do { + r = fread_unlocked(scratch, 1, n, file_); + } while (r == 0 && ferror(file_) && errno == EINTR); + *result = Slice(scratch, r); + if (r < n) { + if (feof(file_)) { + // We leave status as ok if we hit the end of the file + // We also clear the error so that the reads can continue + // if a new data is written to the file + clearerr(file_); + } else { + // A partial read with an error: return a non-ok status + s = IOError("While reading file sequentially", filename_, errno); + } + } + return s; +} + +Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n, + Slice* result, char* scratch) { + assert(use_direct_io()); + assert(IsSectorAligned(offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(n, GetRequiredBufferAlignment())); + assert(IsSectorAligned(scratch, GetRequiredBufferAlignment())); + + Status s; + ssize_t r = -1; + size_t left = n; + char* ptr = scratch; + while (left > 0) { + r = pread(fd_, ptr, left, static_cast(offset)); + if (r <= 0) { + if (r == -1 && errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + if (r % static_cast(GetRequiredBufferAlignment()) != 0) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + break; + } + } + if (r < 0) { + // An error: return a non-ok status + s = IOError( + "While pread " + ToString(n) + " bytes from offset " + ToString(offset), + filename_, errno); + } + *result = Slice(scratch, (r < 0) ? 0 : n - left); + return s; +} + +Status PosixSequentialFile::Skip(uint64_t n) { + if (fseek(file_, static_cast(n), SEEK_CUR)) { + return IOError("While fseek to skip " + ToString(n) + " bytes", filename_, + errno); + } + return Status::OK(); +} + +Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) { +#ifndef OS_LINUX + (void)offset; + (void)length; + return Status::OK(); +#else + if (!use_direct_io()) { + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret != 0) { + return IOError("While fadvise NotNeeded offset " + ToString(offset) + + " len " + ToString(length), + filename_, errno); + } + } + return Status::OK(); +#endif +} + +/* + * PosixRandomAccessFile + */ +#if defined(OS_LINUX) +size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { + if (max_size < kMaxVarint64Length * 3) { + return 0; + } + + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return 0; + } + + long version = 0; + result = ioctl(fd, FS_IOC_GETVERSION, &version); + TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result); + if (result == -1) { + return 0; + } + uint64_t uversion = (uint64_t)version; + + char* rid = id; + rid = EncodeVarint64(rid, buf.st_dev); + rid = EncodeVarint64(rid, buf.st_ino); + rid = EncodeVarint64(rid, uversion); + assert(rid >= id); + return static_cast(rid - id); +} +#endif + +#if defined(OS_MACOSX) || defined(OS_AIX) +size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { + if (max_size < kMaxVarint64Length * 3) { + return 0; + } + + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return 0; + } + + char* rid = id; + rid = EncodeVarint64(rid, buf.st_dev); + rid = EncodeVarint64(rid, buf.st_ino); + rid = EncodeVarint64(rid, buf.st_gen); + assert(rid >= id); + return static_cast(rid - id); +} +#endif +/* + * PosixRandomAccessFile + * + * pread() based random-access + */ +PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd, + const EnvOptions& options) + : filename_(fname), + fd_(fd), + use_direct_io_(options.use_direct_reads), + logical_sector_size_(GetLogicalBufferSize(fd_)) { + assert(!options.use_direct_reads || !options.use_mmap_reads); + assert(!options.use_mmap_reads || sizeof(void*) < 8); +} + +PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); } + +Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + if (use_direct_io()) { + assert(IsSectorAligned(offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(n, GetRequiredBufferAlignment())); + assert(IsSectorAligned(scratch, GetRequiredBufferAlignment())); + } + Status s; + ssize_t r = -1; + size_t left = n; + char* ptr = scratch; + while (left > 0) { + r = pread(fd_, ptr, left, static_cast(offset)); + if (r <= 0) { + if (r == -1 && errno == EINTR) { + continue; + } + break; + } + ptr += r; + offset += r; + left -= r; + if (use_direct_io() && + r % static_cast(GetRequiredBufferAlignment()) != 0) { + // Bytes reads don't fill sectors. Should only happen at the end + // of the file. + break; + } + } + if (r < 0) { + // An error: return a non-ok status + s = IOError( + "While pread offset " + ToString(offset) + " len " + ToString(n), + filename_, errno); + } + *result = Slice(scratch, (r < 0) ? 0 : n - left); + return s; +} + +Status PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n) { + Status s; + if (!use_direct_io()) { + ssize_t r = 0; +#ifdef OS_LINUX + r = readahead(fd_, offset, n); +#endif +#ifdef OS_MACOSX + radvisory advice; + advice.ra_offset = static_cast(offset); + advice.ra_count = static_cast(n); + r = fcntl(fd_, F_RDADVISE, &advice); +#endif + if (r == -1) { + s = IOError("While prefetching offset " + ToString(offset) + " len " + + ToString(n), + filename_, errno); + } + } + return s; +} + +#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX) +size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const { + return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size); +} +#endif + +void PosixRandomAccessFile::Hint(AccessPattern pattern) { + if (use_direct_io()) { + return; + } + switch (pattern) { + case NORMAL: + Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL); + break; + case RANDOM: + Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM); + break; + case SEQUENTIAL: + Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL); + break; + case WILLNEED: + Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED); + break; + case DONTNEED: + Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED); + break; + default: + assert(false); + break; + } +} + +Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) { + if (use_direct_io()) { + return Status::OK(); + } +#ifndef OS_LINUX + (void)offset; + (void)length; + return Status::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return Status::OK(); + } + return IOError("While fadvise NotNeeded offset " + ToString(offset) + + " len " + ToString(length), + filename_, errno); +#endif +} + +/* + * PosixMmapReadableFile + * + * mmap() based random-access + */ +// base[0,length-1] contains the mmapped contents of the file. +PosixMmapReadableFile::PosixMmapReadableFile(const int fd, + const std::string& fname, + void* base, size_t length, + const EnvOptions& options) + : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) { +#ifdef NDEBUG + (void)options; +#endif + fd_ = fd_ + 0; // suppress the warning for used variables + assert(options.use_mmap_reads); + assert(!options.use_direct_reads); +} + +PosixMmapReadableFile::~PosixMmapReadableFile() { + int ret = munmap(mmapped_region_, length_); + if (ret != 0) { + fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n", + mmapped_region_, length_); + } + close(fd_); +} + +Status PosixMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result, + char* /*scratch*/) const { + Status s; + if (offset > length_) { + *result = Slice(); + return IOError("While mmap read offset " + ToString(offset) + + " larger than file length " + ToString(length_), + filename_, EINVAL); + } else if (offset + n > length_) { + n = static_cast(length_ - offset); + } + *result = Slice(reinterpret_cast(mmapped_region_) + offset, n); + return s; +} + +Status PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) { +#ifndef OS_LINUX + (void)offset; + (void)length; + return Status::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return Status::OK(); + } + return IOError("While fadvise not needed. Offset " + ToString(offset) + + " len" + ToString(length), + filename_, errno); +#endif +} + +/* + * PosixMmapFile + * + * We preallocate up to an extra megabyte and use memcpy to append new + * data to the file. This is safe since we either properly close the + * file before reading from it, or for log files, the reading code + * knows enough to skip zero suffixes. + */ +Status PosixMmapFile::UnmapCurrentRegion() { + TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds); + if (base_ != nullptr) { + int munmap_status = munmap(base_, limit_ - base_); + if (munmap_status != 0) { + return IOError("While munmap", filename_, munmap_status); + } + file_offset_ += limit_ - base_; + base_ = nullptr; + limit_ = nullptr; + last_sync_ = nullptr; + dst_ = nullptr; + + // Increase the amount we map the next time, but capped at 1MB + if (map_size_ < (1 << 20)) { + map_size_ *= 2; + } + } + return Status::OK(); +} + +Status PosixMmapFile::MapNewRegion() { +#ifdef ROCKSDB_FALLOCATE_PRESENT + assert(base_ == nullptr); + TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds); + // we can't fallocate with FALLOC_FL_KEEP_SIZE here + if (allow_fallocate_) { + IOSTATS_TIMER_GUARD(allocate_nanos); + int alloc_status = fallocate(fd_, 0, file_offset_, map_size_); + if (alloc_status != 0) { + // fallback to posix_fallocate + alloc_status = posix_fallocate(fd_, file_offset_, map_size_); + } + if (alloc_status != 0) { + return Status::IOError("Error allocating space to file : " + filename_ + + "Error : " + strerror(alloc_status)); + } + } + + TEST_KILL_RANDOM("PosixMmapFile::Append:1", rocksdb_kill_odds); + void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_, + file_offset_); + if (ptr == MAP_FAILED) { + return Status::IOError("MMap failed on " + filename_); + } + TEST_KILL_RANDOM("PosixMmapFile::Append:2", rocksdb_kill_odds); + + base_ = reinterpret_cast(ptr); + limit_ = base_ + map_size_; + dst_ = base_; + last_sync_ = base_; + return Status::OK(); +#else + return Status::NotSupported("This platform doesn't support fallocate()"); +#endif +} + +Status PosixMmapFile::Msync() { + if (dst_ == last_sync_) { + return Status::OK(); + } + // Find the beginnings of the pages that contain the first and last + // bytes to be synced. + size_t p1 = TruncateToPageBoundary(last_sync_ - base_); + size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1); + last_sync_ = dst_; + TEST_KILL_RANDOM("PosixMmapFile::Msync:0", rocksdb_kill_odds); + if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) { + return IOError("While msync", filename_, errno); + } + return Status::OK(); +} + +PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size, + const EnvOptions& options) + : filename_(fname), + fd_(fd), + page_size_(page_size), + map_size_(Roundup(65536, page_size)), + base_(nullptr), + limit_(nullptr), + dst_(nullptr), + last_sync_(nullptr), + file_offset_(0) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + allow_fallocate_ = options.allow_fallocate; + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#else + (void)options; +#endif + assert((page_size & (page_size - 1)) == 0); + assert(options.use_mmap_writes); + assert(!options.use_direct_writes); +} + +PosixMmapFile::~PosixMmapFile() { + if (fd_ >= 0) { + PosixMmapFile::Close(); + } +} + +Status PosixMmapFile::Append(const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + while (left > 0) { + assert(base_ <= dst_); + assert(dst_ <= limit_); + size_t avail = limit_ - dst_; + if (avail == 0) { + Status s = UnmapCurrentRegion(); + if (!s.ok()) { + return s; + } + s = MapNewRegion(); + if (!s.ok()) { + return s; + } + TEST_KILL_RANDOM("PosixMmapFile::Append:0", rocksdb_kill_odds); + } + + size_t n = (left <= avail) ? left : avail; + assert(dst_); + memcpy(dst_, src, n); + dst_ += n; + src += n; + left -= n; + } + return Status::OK(); +} + +Status PosixMmapFile::Close() { + Status s; + size_t unused = limit_ - dst_; + + s = UnmapCurrentRegion(); + if (!s.ok()) { + s = IOError("While closing mmapped file", filename_, errno); + } else if (unused > 0) { + // Trim the extra space at the end of the file + if (ftruncate(fd_, file_offset_ - unused) < 0) { + s = IOError("While ftruncating mmaped file", filename_, errno); + } + } + + if (close(fd_) < 0) { + if (s.ok()) { + s = IOError("While closing mmapped file", filename_, errno); + } + } + + fd_ = -1; + base_ = nullptr; + limit_ = nullptr; + return s; +} + +Status PosixMmapFile::Flush() { return Status::OK(); } + +Status PosixMmapFile::Sync() { + if (fdatasync(fd_) < 0) { + return IOError("While fdatasync mmapped file", filename_, errno); + } + + return Msync(); +} + +/** + * Flush data as well as metadata to stable storage. + */ +Status PosixMmapFile::Fsync() { + if (fsync(fd_) < 0) { + return IOError("While fsync mmaped file", filename_, errno); + } + + return Msync(); +} + +/** + * Get the size of valid data in the file. This will not match the + * size that is returned from the filesystem because we use mmap + * to extend file by map_size every time. + */ +uint64_t PosixMmapFile::GetFileSize() { + size_t used = dst_ - base_; + return file_offset_ + used; +} + +Status PosixMmapFile::InvalidateCache(size_t offset, size_t length) { +#ifndef OS_LINUX + (void)offset; + (void)length; + return Status::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return Status::OK(); + } + return IOError("While fadvise NotNeeded mmapped file", filename_, errno); +#endif +} + +#ifdef ROCKSDB_FALLOCATE_PRESENT +Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) { + assert(offset <= std::numeric_limits::max()); + assert(len <= std::numeric_limits::max()); + TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds); + int alloc_status = 0; + if (allow_fallocate_) { + alloc_status = fallocate( + fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast(offset), static_cast(len)); + } + if (alloc_status == 0) { + return Status::OK(); + } else { + return IOError( + "While fallocate offset " + ToString(offset) + " len " + ToString(len), + filename_, errno); + } +} +#endif + +/* + * PosixWritableFile + * + * Use posix write to write data to a file. + */ +PosixWritableFile::PosixWritableFile(const std::string& fname, int fd, + const EnvOptions& options) + : filename_(fname), + use_direct_io_(options.use_direct_writes), + fd_(fd), + filesize_(0), + logical_sector_size_(GetLogicalBufferSize(fd_)) { +#ifdef ROCKSDB_FALLOCATE_PRESENT + allow_fallocate_ = options.allow_fallocate; + fallocate_with_keep_size_ = options.fallocate_with_keep_size; +#endif + assert(!options.use_mmap_writes); +} + +PosixWritableFile::~PosixWritableFile() { + if (fd_ >= 0) { + PosixWritableFile::Close(); + } +} + +Status PosixWritableFile::Append(const Slice& data) { + if (use_direct_io()) { + assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment())); + assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment())); + } + const char* src = data.data(); + size_t left = data.size(); + while (left != 0) { + ssize_t done = write(fd_, src, left); + if (done < 0) { + if (errno == EINTR) { + continue; + } + return IOError("While appending to file", filename_, errno); + } + left -= done; + src += done; + } + filesize_ += data.size(); + return Status::OK(); +} + +Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) { + if (use_direct_io()) { + assert(IsSectorAligned(offset, GetRequiredBufferAlignment())); + assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment())); + assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment())); + } + assert(offset <= std::numeric_limits::max()); + const char* src = data.data(); + size_t left = data.size(); + while (left != 0) { + ssize_t done = pwrite(fd_, src, left, static_cast(offset)); + if (done < 0) { + if (errno == EINTR) { + continue; + } + return IOError("While pwrite to file at offset " + ToString(offset), + filename_, errno); + } + left -= done; + offset += done; + src += done; + } + filesize_ = offset; + return Status::OK(); +} + +Status PosixWritableFile::Truncate(uint64_t size) { + Status s; + int r = ftruncate(fd_, size); + if (r < 0) { + s = IOError("While ftruncate file to size " + ToString(size), filename_, + errno); + } else { + filesize_ = size; + } + return s; +} + +Status PosixWritableFile::Close() { + Status s; + + size_t block_size; + size_t last_allocated_block; + GetPreallocationStatus(&block_size, &last_allocated_block); + if (last_allocated_block > 0) { + // trim the extra space preallocated at the end of the file + // NOTE(ljin): we probably don't want to surface failure as an IOError, + // but it will be nice to log these errors. + int dummy __attribute__((__unused__)); + dummy = ftruncate(fd_, filesize_); +#if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \ + !defined(TRAVIS) + // in some file systems, ftruncate only trims trailing space if the + // new file size is smaller than the current size. Calling fallocate + // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused + // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following + // filesystems: + // XFS (since Linux 2.6.38) + // ext4 (since Linux 3.0) + // Btrfs (since Linux 3.7) + // tmpfs (since Linux 3.5) + // We ignore error since failure of this operation does not affect + // correctness. + // TRAVIS - this code does not work on TRAVIS filesystems. + // the FALLOC_FL_KEEP_SIZE option is expected to not change the size + // of the file, but it does. Simple strace report will show that. + // While we work with Travis-CI team to figure out if this is a + // quirk of Docker/AUFS, we will comment this out. + struct stat file_stats; + int result = fstat(fd_, &file_stats); + // After ftruncate, we check whether ftruncate has the correct behavior. + // If not, we should hack it with FALLOC_FL_PUNCH_HOLE + if (result == 0 && + (file_stats.st_size + file_stats.st_blksize - 1) / + file_stats.st_blksize != + file_stats.st_blocks / (file_stats.st_blksize / 512)) { + IOSTATS_TIMER_GUARD(allocate_nanos); + if (allow_fallocate_) { + fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_, + block_size * last_allocated_block - filesize_); + } + } +#endif + } + + if (close(fd_) < 0) { + s = IOError("While closing file after writing", filename_, errno); + } + fd_ = -1; + return s; +} + +// write out the cached data to the OS cache +Status PosixWritableFile::Flush() { return Status::OK(); } + +Status PosixWritableFile::Sync() { + if (fdatasync(fd_) < 0) { + return IOError("While fdatasync", filename_, errno); + } + return Status::OK(); +} + +Status PosixWritableFile::Fsync() { + if (fsync(fd_) < 0) { + return IOError("While fsync", filename_, errno); + } + return Status::OK(); +} + +bool PosixWritableFile::IsSyncThreadSafe() const { return true; } + +uint64_t PosixWritableFile::GetFileSize() { return filesize_; } + +void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) { +#ifdef OS_LINUX +// Suppress Valgrind "Unimplemented functionality" error. +#ifndef ROCKSDB_VALGRIND_RUN + if (hint == write_hint_) { + return; + } + if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) { + write_hint_ = hint; + } +#else + (void)hint; +#endif // ROCKSDB_VALGRIND_RUN +#else + (void)hint; +#endif // OS_LINUX +} + +Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) { + if (use_direct_io()) { + return Status::OK(); + } +#ifndef OS_LINUX + (void)offset; + (void)length; + return Status::OK(); +#else + // free OS pages + int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED); + if (ret == 0) { + return Status::OK(); + } + return IOError("While fadvise NotNeeded", filename_, errno); +#endif +} + +#ifdef ROCKSDB_FALLOCATE_PRESENT +Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) { + assert(offset <= std::numeric_limits::max()); + assert(len <= std::numeric_limits::max()); + TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds); + IOSTATS_TIMER_GUARD(allocate_nanos); + int alloc_status = 0; + if (allow_fallocate_) { + alloc_status = fallocate( + fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0, + static_cast(offset), static_cast(len)); + } + if (alloc_status == 0) { + return Status::OK(); + } else { + return IOError( + "While fallocate offset " + ToString(offset) + " len " + ToString(len), + filename_, errno); + } +} +#endif + +#ifdef ROCKSDB_RANGESYNC_PRESENT +Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) { + assert(offset <= std::numeric_limits::max()); + assert(nbytes <= std::numeric_limits::max()); + if (sync_file_range(fd_, static_cast(offset), + static_cast(nbytes), SYNC_FILE_RANGE_WRITE) == 0) { + return Status::OK(); + } else { + return IOError("While sync_file_range offset " + ToString(offset) + + " bytes " + ToString(nbytes), + filename_, errno); + } +} +#endif + +#ifdef OS_LINUX +size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const { + return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size); +} +#endif + +/* + * PosixRandomRWFile + */ + +PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd, + const EnvOptions& /*options*/) + : filename_(fname), fd_(fd) {} + +PosixRandomRWFile::~PosixRandomRWFile() { + if (fd_ >= 0) { + Close(); + } +} + +Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) { + const char* src = data.data(); + size_t left = data.size(); + while (left != 0) { + ssize_t done = pwrite(fd_, src, left, offset); + if (done < 0) { + // error while writing to file + if (errno == EINTR) { + // write was interrupted, try again. + continue; + } + return IOError( + "While write random read/write file at offset " + ToString(offset), + filename_, errno); + } + + // Wrote `done` bytes + left -= done; + offset += done; + src += done; + } + + return Status::OK(); +} + +Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + size_t left = n; + char* ptr = scratch; + while (left > 0) { + ssize_t done = pread(fd_, ptr, left, offset); + if (done < 0) { + // error while reading from file + if (errno == EINTR) { + // read was interrupted, try again. + continue; + } + return IOError("While reading random read/write file offset " + + ToString(offset) + " len " + ToString(n), + filename_, errno); + } else if (done == 0) { + // Nothing more to read + break; + } + + // Read `done` bytes + ptr += done; + offset += done; + left -= done; + } + + *result = Slice(scratch, n - left); + return Status::OK(); +} + +Status PosixRandomRWFile::Flush() { return Status::OK(); } + +Status PosixRandomRWFile::Sync() { + if (fdatasync(fd_) < 0) { + return IOError("While fdatasync random read/write file", filename_, errno); + } + return Status::OK(); +} + +Status PosixRandomRWFile::Fsync() { + if (fsync(fd_) < 0) { + return IOError("While fsync random read/write file", filename_, errno); + } + return Status::OK(); +} + +Status PosixRandomRWFile::Close() { + if (close(fd_) < 0) { + return IOError("While close random read/write file", filename_, errno); + } + fd_ = -1; + return Status::OK(); +} + +PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() { + // TODO should have error handling though not much we can do... + munmap(this->base_, length_); +} + +/* + * PosixDirectory + */ + +PosixDirectory::~PosixDirectory() { close(fd_); } + +Status PosixDirectory::Fsync() { +#ifndef OS_AIX + if (fsync(fd_) == -1) { + return IOError("While fsync", "a directory", errno); + } +#endif + return Status::OK(); +} +} // namespace rocksdb +#endif diff --git a/src/rocksdb/env/io_posix.h b/src/rocksdb/env/io_posix.h new file mode 100644 index 00000000..e6824d3e --- /dev/null +++ b/src/rocksdb/env/io_posix.h @@ -0,0 +1,258 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once +#include +#include +#include +#include +#include "rocksdb/env.h" + +// For non linux platform, the following macros are used only as place +// holder. +#if !(defined OS_LINUX) && !(defined CYGWIN) && !(defined OS_AIX) +#define POSIX_FADV_NORMAL 0 /* [MC1] no further special treatment */ +#define POSIX_FADV_RANDOM 1 /* [MC1] expect random page refs */ +#define POSIX_FADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */ +#define POSIX_FADV_WILLNEED 3 /* [MC1] will need these pages */ +#define POSIX_FADV_DONTNEED 4 /* [MC1] dont need these pages */ +#endif + +namespace rocksdb { +static std::string IOErrorMsg(const std::string& context, + const std::string& file_name) { + if (file_name.empty()) { + return context; + } + return context + ": " + file_name; +} + +// file_name can be left empty if it is not unkown. +static Status IOError(const std::string& context, const std::string& file_name, + int err_number) { + switch (err_number) { + case ENOSPC: + return Status::NoSpace(IOErrorMsg(context, file_name), + strerror(err_number)); + case ESTALE: + return Status::IOError(Status::kStaleFile); + case ENOENT: + return Status::PathNotFound(IOErrorMsg(context, file_name), + strerror(err_number)); + default: + return Status::IOError(IOErrorMsg(context, file_name), + strerror(err_number)); + } +} + +class PosixHelper { + public: + static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size); +}; + +class PosixSequentialFile : public SequentialFile { + private: + std::string filename_; + FILE* file_; + int fd_; + bool use_direct_io_; + size_t logical_sector_size_; + + public: + PosixSequentialFile(const std::string& fname, FILE* file, int fd, + const EnvOptions& options); + virtual ~PosixSequentialFile(); + + virtual Status Read(size_t n, Slice* result, char* scratch) override; + virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result, + char* scratch) override; + virtual Status Skip(uint64_t n) override; + virtual Status InvalidateCache(size_t offset, size_t length) override; + virtual bool use_direct_io() const override { return use_direct_io_; } + virtual size_t GetRequiredBufferAlignment() const override { + return logical_sector_size_; + } +}; + +class PosixRandomAccessFile : public RandomAccessFile { + protected: + std::string filename_; + int fd_; + bool use_direct_io_; + size_t logical_sector_size_; + + public: + PosixRandomAccessFile(const std::string& fname, int fd, + const EnvOptions& options); + virtual ~PosixRandomAccessFile(); + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual Status Prefetch(uint64_t offset, size_t n) override; + +#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX) + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +#endif + virtual void Hint(AccessPattern pattern) override; + virtual Status InvalidateCache(size_t offset, size_t length) override; + virtual bool use_direct_io() const override { return use_direct_io_; } + virtual size_t GetRequiredBufferAlignment() const override { + return logical_sector_size_; + } +}; + +class PosixWritableFile : public WritableFile { + protected: + const std::string filename_; + const bool use_direct_io_; + int fd_; + uint64_t filesize_; + size_t logical_sector_size_; +#ifdef ROCKSDB_FALLOCATE_PRESENT + bool allow_fallocate_; + bool fallocate_with_keep_size_; +#endif + + public: + explicit PosixWritableFile(const std::string& fname, int fd, + const EnvOptions& options); + virtual ~PosixWritableFile(); + + // Need to implement this so the file is truncated correctly + // with direct I/O + virtual Status Truncate(uint64_t size) override; + virtual Status Close() override; + virtual Status Append(const Slice& data) override; + virtual Status PositionedAppend(const Slice& data, uint64_t offset) override; + virtual Status Flush() override; + virtual Status Sync() override; + virtual Status Fsync() override; + virtual bool IsSyncThreadSafe() const override; + virtual bool use_direct_io() const override { return use_direct_io_; } + virtual void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override; + virtual uint64_t GetFileSize() override; + virtual Status InvalidateCache(size_t offset, size_t length) override; + virtual size_t GetRequiredBufferAlignment() const override { + return logical_sector_size_; + } +#ifdef ROCKSDB_FALLOCATE_PRESENT + virtual Status Allocate(uint64_t offset, uint64_t len) override; +#endif +#ifdef ROCKSDB_RANGESYNC_PRESENT + virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override; +#endif +#ifdef OS_LINUX + virtual size_t GetUniqueId(char* id, size_t max_size) const override; +#endif +}; + +// mmap() based random-access +class PosixMmapReadableFile : public RandomAccessFile { + private: + int fd_; + std::string filename_; + void* mmapped_region_; + size_t length_; + + public: + PosixMmapReadableFile(const int fd, const std::string& fname, void* base, + size_t length, const EnvOptions& options); + virtual ~PosixMmapReadableFile(); + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + virtual Status InvalidateCache(size_t offset, size_t length) override; +}; + +class PosixMmapFile : public WritableFile { + private: + std::string filename_; + int fd_; + size_t page_size_; + size_t map_size_; // How much extra memory to map at a time + char* base_; // The mapped region + char* limit_; // Limit of the mapped region + char* dst_; // Where to write next (in range [base_,limit_]) + char* last_sync_; // Where have we synced up to + uint64_t file_offset_; // Offset of base_ in file +#ifdef ROCKSDB_FALLOCATE_PRESENT + bool allow_fallocate_; // If false, fallocate calls are bypassed + bool fallocate_with_keep_size_; +#endif + + // Roundup x to a multiple of y + static size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; } + + size_t TruncateToPageBoundary(size_t s) { + s -= (s & (page_size_ - 1)); + assert((s % page_size_) == 0); + return s; + } + + Status MapNewRegion(); + Status UnmapCurrentRegion(); + Status Msync(); + + public: + PosixMmapFile(const std::string& fname, int fd, size_t page_size, + const EnvOptions& options); + ~PosixMmapFile(); + + // Means Close() will properly take care of truncate + // and it does not need any additional information + virtual Status Truncate(uint64_t /*size*/) override { return Status::OK(); } + virtual Status Close() override; + virtual Status Append(const Slice& data) override; + virtual Status Flush() override; + virtual Status Sync() override; + virtual Status Fsync() override; + virtual uint64_t GetFileSize() override; + virtual Status InvalidateCache(size_t offset, size_t length) override; +#ifdef ROCKSDB_FALLOCATE_PRESENT + virtual Status Allocate(uint64_t offset, uint64_t len) override; +#endif +}; + +class PosixRandomRWFile : public RandomRWFile { + public: + explicit PosixRandomRWFile(const std::string& fname, int fd, + const EnvOptions& options); + virtual ~PosixRandomRWFile(); + + virtual Status Write(uint64_t offset, const Slice& data) override; + + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + + virtual Status Flush() override; + virtual Status Sync() override; + virtual Status Fsync() override; + virtual Status Close() override; + + private: + const std::string filename_; + int fd_; +}; + +struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer { + PosixMemoryMappedFileBuffer(void* _base, size_t _length) + : MemoryMappedFileBuffer(_base, _length) {} + virtual ~PosixMemoryMappedFileBuffer(); +}; + +class PosixDirectory : public Directory { + public: + explicit PosixDirectory(int fd) : fd_(fd) {} + ~PosixDirectory(); + virtual Status Fsync() override; + + private: + int fd_; +}; + +} // namespace rocksdb diff --git a/src/rocksdb/env/mock_env.cc b/src/rocksdb/env/mock_env.cc new file mode 100644 index 00000000..793a0837 --- /dev/null +++ b/src/rocksdb/env/mock_env.cc @@ -0,0 +1,775 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "env/mock_env.h" +#include +#include +#include "port/sys_time.h" +#include "util/cast_util.h" +#include "util/murmurhash.h" +#include "util/random.h" +#include "util/rate_limiter.h" + +namespace rocksdb { + +class MemFile { + public: + explicit MemFile(Env* env, const std::string& fn, bool _is_lock_file = false) + : env_(env), + fn_(fn), + refs_(0), + is_lock_file_(_is_lock_file), + locked_(false), + size_(0), + modified_time_(Now()), + rnd_(static_cast( + MurmurHash(fn.data(), static_cast(fn.size()), 0))), + fsynced_bytes_(0) {} + + void Ref() { + MutexLock lock(&mutex_); + ++refs_; + } + + bool is_lock_file() const { return is_lock_file_; } + + bool Lock() { + assert(is_lock_file_); + MutexLock lock(&mutex_); + if (locked_) { + return false; + } else { + locked_ = true; + return true; + } + } + + void Unlock() { + assert(is_lock_file_); + MutexLock lock(&mutex_); + locked_ = false; + } + + void Unref() { + bool do_delete = false; + { + MutexLock lock(&mutex_); + --refs_; + assert(refs_ >= 0); + if (refs_ <= 0) { + do_delete = true; + } + } + + if (do_delete) { + delete this; + } + } + + uint64_t Size() const { return size_; } + + void Truncate(size_t size) { + MutexLock lock(&mutex_); + if (size < size_) { + data_.resize(size); + size_ = size; + } + } + + void CorruptBuffer() { + if (fsynced_bytes_ >= size_) { + return; + } + uint64_t buffered_bytes = size_ - fsynced_bytes_; + uint64_t start = + fsynced_bytes_ + rnd_.Uniform(static_cast(buffered_bytes)); + uint64_t end = std::min(start + 512, size_.load()); + MutexLock lock(&mutex_); + for (uint64_t pos = start; pos < end; ++pos) { + data_[static_cast(pos)] = static_cast(rnd_.Uniform(256)); + } + } + + Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { + MutexLock lock(&mutex_); + const uint64_t available = Size() - std::min(Size(), offset); + size_t offset_ = static_cast(offset); + if (n > available) { + n = static_cast(available); + } + if (n == 0) { + *result = Slice(); + return Status::OK(); + } + if (scratch) { + memcpy(scratch, &(data_[offset_]), n); + *result = Slice(scratch, n); + } else { + *result = Slice(&(data_[offset_]), n); + } + return Status::OK(); + } + + Status Write(uint64_t offset, const Slice& data) { + MutexLock lock(&mutex_); + size_t offset_ = static_cast(offset); + if (offset + data.size() > data_.size()) { + data_.resize(offset_ + data.size()); + } + data_.replace(offset_, data.size(), data.data(), data.size()); + size_ = data_.size(); + modified_time_ = Now(); + return Status::OK(); + } + + Status Append(const Slice& data) { + MutexLock lock(&mutex_); + data_.append(data.data(), data.size()); + size_ = data_.size(); + modified_time_ = Now(); + return Status::OK(); + } + + Status Fsync() { + fsynced_bytes_ = size_.load(); + return Status::OK(); + } + + uint64_t ModifiedTime() const { return modified_time_; } + + private: + uint64_t Now() { + int64_t unix_time = 0; + auto s = env_->GetCurrentTime(&unix_time); + assert(s.ok()); + return static_cast(unix_time); + } + + // Private since only Unref() should be used to delete it. + ~MemFile() { assert(refs_ == 0); } + + // No copying allowed. + MemFile(const MemFile&); + void operator=(const MemFile&); + + Env* env_; + const std::string fn_; + mutable port::Mutex mutex_; + int refs_; + bool is_lock_file_; + bool locked_; + + // Data written into this file, all bytes before fsynced_bytes are + // persistent. + std::string data_; + std::atomic size_; + std::atomic modified_time_; + + Random rnd_; + std::atomic fsynced_bytes_; +}; + +namespace { + +class MockSequentialFile : public SequentialFile { + public: + explicit MockSequentialFile(MemFile* file) : file_(file), pos_(0) { + file_->Ref(); + } + + ~MockSequentialFile() override { file_->Unref(); } + + Status Read(size_t n, Slice* result, char* scratch) override { + Status s = file_->Read(pos_, n, result, scratch); + if (s.ok()) { + pos_ += result->size(); + } + return s; + } + + Status Skip(uint64_t n) override { + if (pos_ > file_->Size()) { + return Status::IOError("pos_ > file_->Size()"); + } + const uint64_t available = file_->Size() - pos_; + if (n > available) { + n = available; + } + pos_ += static_cast(n); + return Status::OK(); + } + + private: + MemFile* file_; + size_t pos_; +}; + +class MockRandomAccessFile : public RandomAccessFile { + public: + explicit MockRandomAccessFile(MemFile* file) : file_(file) { file_->Ref(); } + + ~MockRandomAccessFile() override { file_->Unref(); } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return file_->Read(offset, n, result, scratch); + } + + private: + MemFile* file_; +}; + +class MockRandomRWFile : public RandomRWFile { + public: + explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); } + + ~MockRandomRWFile() override { file_->Unref(); } + + Status Write(uint64_t offset, const Slice& data) override { + return file_->Write(offset, data); + } + + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override { + return file_->Read(offset, n, result, scratch); + } + + Status Close() override { return file_->Fsync(); } + + Status Flush() override { return Status::OK(); } + + Status Sync() override { return file_->Fsync(); } + + private: + MemFile* file_; +}; + +class MockWritableFile : public WritableFile { + public: + MockWritableFile(MemFile* file, RateLimiter* rate_limiter) + : file_(file), rate_limiter_(rate_limiter) { + file_->Ref(); + } + + ~MockWritableFile() override { file_->Unref(); } + + Status Append(const Slice& data) override { + size_t bytes_written = 0; + while (bytes_written < data.size()) { + auto bytes = RequestToken(data.size() - bytes_written); + Status s = file_->Append(Slice(data.data() + bytes_written, bytes)); + if (!s.ok()) { + return s; + } + bytes_written += bytes; + } + return Status::OK(); + } + Status Truncate(uint64_t size) override { + file_->Truncate(static_cast(size)); + return Status::OK(); + } + Status Close() override { return file_->Fsync(); } + + Status Flush() override { return Status::OK(); } + + Status Sync() override { return file_->Fsync(); } + + uint64_t GetFileSize() override { return file_->Size(); } + + private: + inline size_t RequestToken(size_t bytes) { + if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) { + bytes = std::min( + bytes, static_cast(rate_limiter_->GetSingleBurstBytes())); + rate_limiter_->Request(bytes, io_priority_); + } + return bytes; + } + + MemFile* file_; + RateLimiter* rate_limiter_; +}; + +class MockEnvDirectory : public Directory { + public: + Status Fsync() override { return Status::OK(); } +}; + +class MockEnvFileLock : public FileLock { + public: + explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {} + + std::string FileName() const { return fname_; } + + private: + const std::string fname_; +}; + +class TestMemLogger : public Logger { + private: + std::unique_ptr file_; + std::atomic_size_t log_size_; + static const uint64_t flush_every_seconds_ = 5; + std::atomic_uint_fast64_t last_flush_micros_; + Env* env_; + std::atomic flush_pending_; + + public: + TestMemLogger(std::unique_ptr f, Env* env, + const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) + : Logger(log_level), + file_(std::move(f)), + log_size_(0), + last_flush_micros_(0), + env_(env), + flush_pending_(false) {} + ~TestMemLogger() override {} + + void Flush() override { + if (flush_pending_) { + flush_pending_ = false; + } + last_flush_micros_ = env_->NowMicros(); + } + + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 30000; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + memset(&t, 0, sizeof(t)); + struct tm* ret __attribute__((__unused__)); + ret = localtime_r(&seconds, &t); + assert(ret); + p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, + t.tm_min, t.tm_sec, static_cast(now_tv.tv_usec)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + const size_t write_size = p - base; + + file_->Append(Slice(base, write_size)); + flush_pending_ = true; + log_size_ += write_size; + uint64_t now_micros = + static_cast(now_tv.tv_sec) * 1000000 + now_tv.tv_usec; + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + flush_pending_ = false; + last_flush_micros_ = now_micros; + } + if (base != buffer) { + delete[] base; + } + break; + } + } + size_t GetLogFileSize() const override { return log_size_; } +}; + +} // Anonymous namespace + +MockEnv::MockEnv(Env* base_env) : EnvWrapper(base_env), fake_sleep_micros_(0) {} + +MockEnv::~MockEnv() { + for (FileSystem::iterator i = file_map_.begin(); i != file_map_.end(); ++i) { + i->second->Unref(); + } +} + +// Partial implementation of the Env interface. +Status MockEnv::NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& /*soptions*/) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + *result = nullptr; + return Status::IOError(fn, "File not found"); + } + auto* f = file_map_[fn]; + if (f->is_lock_file()) { + return Status::InvalidArgument(fn, "Cannot open a lock file."); + } + result->reset(new MockSequentialFile(f)); + return Status::OK(); +} + +Status MockEnv::NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& /*soptions*/) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + *result = nullptr; + return Status::IOError(fn, "File not found"); + } + auto* f = file_map_[fn]; + if (f->is_lock_file()) { + return Status::InvalidArgument(fn, "Cannot open a lock file."); + } + result->reset(new MockRandomAccessFile(f)); + return Status::OK(); +} + +Status MockEnv::NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& /*soptions*/) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + *result = nullptr; + return Status::IOError(fn, "File not found"); + } + auto* f = file_map_[fn]; + if (f->is_lock_file()) { + return Status::InvalidArgument(fn, "Cannot open a lock file."); + } + result->reset(new MockRandomRWFile(f)); + return Status::OK(); +} + +Status MockEnv::ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) { + auto s = RenameFile(old_fname, fname); + if (!s.ok()) { + return s; + } + result->reset(); + return NewWritableFile(fname, result, options); +} + +Status MockEnv::NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& env_options) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + DeleteFileInternal(fn); + } + MemFile* file = new MemFile(this, fn, false); + file->Ref(); + file_map_[fn] = file; + + result->reset(new MockWritableFile(file, env_options.rate_limiter)); + return Status::OK(); +} + +Status MockEnv::NewDirectory(const std::string& /*name*/, + std::unique_ptr* result) { + result->reset(new MockEnvDirectory()); + return Status::OK(); +} + +Status MockEnv::FileExists(const std::string& fname) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + // File exists + return Status::OK(); + } + // Now also check if fn exists as a dir + for (const auto& iter : file_map_) { + const std::string& filename = iter.first; + if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' && + Slice(filename).starts_with(Slice(fn))) { + return Status::OK(); + } + } + return Status::NotFound(); +} + +Status MockEnv::GetChildren(const std::string& dir, + std::vector* result) { + auto d = NormalizePath(dir); + bool found_dir = false; + { + MutexLock lock(&mutex_); + result->clear(); + for (const auto& iter : file_map_) { + const std::string& filename = iter.first; + + if (filename == d) { + found_dir = true; + } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' && + Slice(filename).starts_with(Slice(d))) { + found_dir = true; + size_t next_slash = filename.find('/', d.size() + 1); + if (next_slash != std::string::npos) { + result->push_back( + filename.substr(d.size() + 1, next_slash - d.size() - 1)); + } else { + result->push_back(filename.substr(d.size() + 1)); + } + } + } + } + result->erase(std::unique(result->begin(), result->end()), result->end()); + return found_dir ? Status::OK() : Status::NotFound(); +} + +void MockEnv::DeleteFileInternal(const std::string& fname) { + assert(fname == NormalizePath(fname)); + const auto& pair = file_map_.find(fname); + if (pair != file_map_.end()) { + pair->second->Unref(); + file_map_.erase(fname); + } +} + +Status MockEnv::DeleteFile(const std::string& fname) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + if (file_map_.find(fn) == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + + DeleteFileInternal(fn); + return Status::OK(); +} + +Status MockEnv::Truncate(const std::string& fname, size_t size) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + iter->second->Truncate(size); + return Status::OK(); +} + +Status MockEnv::CreateDir(const std::string& dirname) { + auto dn = NormalizePath(dirname); + if (file_map_.find(dn) == file_map_.end()) { + MemFile* file = new MemFile(this, dn, false); + file->Ref(); + file_map_[dn] = file; + } else { + return Status::IOError(); + } + return Status::OK(); +} + +Status MockEnv::CreateDirIfMissing(const std::string& dirname) { + CreateDir(dirname); + return Status::OK(); +} + +Status MockEnv::DeleteDir(const std::string& dirname) { + return DeleteFile(dirname); +} + +Status MockEnv::GetFileSize(const std::string& fname, uint64_t* file_size) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + + *file_size = iter->second->Size(); + return Status::OK(); +} + +Status MockEnv::GetFileModificationTime(const std::string& fname, + uint64_t* time) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + *time = iter->second->ModifiedTime(); + return Status::OK(); +} + +Status MockEnv::RenameFile(const std::string& src, const std::string& dest) { + auto s = NormalizePath(src); + auto t = NormalizePath(dest); + MutexLock lock(&mutex_); + if (file_map_.find(s) == file_map_.end()) { + return Status::IOError(s, "File not found"); + } + + DeleteFileInternal(t); + file_map_[t] = file_map_[s]; + file_map_.erase(s); + return Status::OK(); +} + +Status MockEnv::LinkFile(const std::string& src, const std::string& dest) { + auto s = NormalizePath(src); + auto t = NormalizePath(dest); + MutexLock lock(&mutex_); + if (file_map_.find(s) == file_map_.end()) { + return Status::IOError(s, "File not found"); + } + + DeleteFileInternal(t); + file_map_[t] = file_map_[s]; + file_map_[t]->Ref(); // Otherwise it might get deleted when noone uses s + return Status::OK(); +} + +Status MockEnv::NewLogger(const std::string& fname, + std::shared_ptr* result) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + MemFile* file = nullptr; + if (iter == file_map_.end()) { + file = new MemFile(this, fn, false); + file->Ref(); + file_map_[fn] = file; + } else { + file = iter->second; + } + std::unique_ptr f(new MockWritableFile(file, nullptr)); + result->reset(new TestMemLogger(std::move(f), this)); + return Status::OK(); +} + +Status MockEnv::LockFile(const std::string& fname, FileLock** flock) { + auto fn = NormalizePath(fname); + { + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + if (!file_map_[fn]->is_lock_file()) { + return Status::InvalidArgument(fname, "Not a lock file."); + } + if (!file_map_[fn]->Lock()) { + return Status::IOError(fn, "Lock is already held."); + } + } else { + auto* file = new MemFile(this, fn, true); + file->Ref(); + file->Lock(); + file_map_[fn] = file; + } + } + *flock = new MockEnvFileLock(fn); + return Status::OK(); +} + +Status MockEnv::UnlockFile(FileLock* flock) { + std::string fn = + static_cast_with_check(flock)->FileName(); + { + MutexLock lock(&mutex_); + if (file_map_.find(fn) != file_map_.end()) { + if (!file_map_[fn]->is_lock_file()) { + return Status::InvalidArgument(fn, "Not a lock file."); + } + file_map_[fn]->Unlock(); + } + } + delete flock; + return Status::OK(); +} + +Status MockEnv::GetTestDirectory(std::string* path) { + *path = "/test"; + return Status::OK(); +} + +Status MockEnv::GetCurrentTime(int64_t* unix_time) { + auto s = EnvWrapper::GetCurrentTime(unix_time); + if (s.ok()) { + *unix_time += fake_sleep_micros_.load() / (1000 * 1000); + } + return s; +} + +uint64_t MockEnv::NowMicros() { + return EnvWrapper::NowMicros() + fake_sleep_micros_.load(); +} + +uint64_t MockEnv::NowNanos() { + return EnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000; +} + +Status MockEnv::CorruptBuffer(const std::string& fname) { + auto fn = NormalizePath(fname); + MutexLock lock(&mutex_); + auto iter = file_map_.find(fn); + if (iter == file_map_.end()) { + return Status::IOError(fn, "File not found"); + } + iter->second->CorruptBuffer(); + return Status::OK(); +} + +std::string MockEnv::NormalizePath(const std::string path) { + std::string dst; + for (auto c : path) { + if (!dst.empty() && c == '/' && dst.back() == '/') { + continue; + } + dst.push_back(c); + } + return dst; +} + +void MockEnv::FakeSleepForMicroseconds(int64_t micros) { + fake_sleep_micros_.fetch_add(micros); +} + +#ifndef ROCKSDB_LITE +// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnv +Env* NewMemEnv(Env* base_env) { return new MockEnv(base_env); } + +#else // ROCKSDB_LITE + +Env* NewMemEnv(Env* /*base_env*/) { return nullptr; } + +#endif // !ROCKSDB_LITE + +} // namespace rocksdb diff --git a/src/rocksdb/env/mock_env.h b/src/rocksdb/env/mock_env.h new file mode 100644 index 00000000..87b8deaf --- /dev/null +++ b/src/rocksdb/env/mock_env.h @@ -0,0 +1,114 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#pragma once + +#include +#include +#include +#include +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "port/port.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +class MemFile; +class MockEnv : public EnvWrapper { + public: + explicit MockEnv(Env* base_env); + + virtual ~MockEnv(); + + // Partial implementation of the Env interface. + virtual Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& soptions) override; + + virtual Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& soptions) override; + + virtual Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + virtual Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override; + + virtual Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& env_options) override; + + virtual Status NewDirectory(const std::string& name, + std::unique_ptr* result) override; + + virtual Status FileExists(const std::string& fname) override; + + virtual Status GetChildren(const std::string& dir, + std::vector* result) override; + + void DeleteFileInternal(const std::string& fname); + + virtual Status DeleteFile(const std::string& fname) override; + + virtual Status Truncate(const std::string& fname, size_t size) override; + + virtual Status CreateDir(const std::string& dirname) override; + + virtual Status CreateDirIfMissing(const std::string& dirname) override; + + virtual Status DeleteDir(const std::string& dirname) override; + + virtual Status GetFileSize(const std::string& fname, + uint64_t* file_size) override; + + virtual Status GetFileModificationTime(const std::string& fname, + uint64_t* time) override; + + virtual Status RenameFile(const std::string& src, + const std::string& target) override; + + virtual Status LinkFile(const std::string& src, + const std::string& target) override; + + virtual Status NewLogger(const std::string& fname, + std::shared_ptr* result) override; + + virtual Status LockFile(const std::string& fname, FileLock** flock) override; + + virtual Status UnlockFile(FileLock* flock) override; + + virtual Status GetTestDirectory(std::string* path) override; + + // Results of these can be affected by FakeSleepForMicroseconds() + virtual Status GetCurrentTime(int64_t* unix_time) override; + virtual uint64_t NowMicros() override; + virtual uint64_t NowNanos() override; + + Status CorruptBuffer(const std::string& fname); + + // Doesn't really sleep, just affects output of GetCurrentTime(), NowMicros() + // and NowNanos() + void FakeSleepForMicroseconds(int64_t micros); + + private: + std::string NormalizePath(const std::string path); + + // Map from filenames to MemFile objects, representing a simple file system. + typedef std::map FileSystem; + port::Mutex mutex_; + FileSystem file_map_; // Protected by mutex_. + + std::atomic fake_sleep_micros_; +}; + +} // namespace rocksdb diff --git a/src/rocksdb/env/mock_env_test.cc b/src/rocksdb/env/mock_env_test.cc new file mode 100644 index 00000000..2daf682e --- /dev/null +++ b/src/rocksdb/env/mock_env_test.cc @@ -0,0 +1,83 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "env/mock_env.h" + +#include +#include + +#include "rocksdb/env.h" +#include "util/testharness.h" + +namespace rocksdb { + +class MockEnvTest : public testing::Test { + public: + MockEnv* env_; + const EnvOptions soptions_; + + MockEnvTest() + : env_(new MockEnv(Env::Default())) { + } + ~MockEnvTest() override { delete env_; } +}; + +TEST_F(MockEnvTest, Corrupt) { + const std::string kGood = "this is a good string, synced to disk"; + const std::string kCorrupted = "this part may be corrupted"; + const std::string kFileName = "/dir/f"; + std::unique_ptr writable_file; + ASSERT_OK(env_->NewWritableFile(kFileName, &writable_file, soptions_)); + ASSERT_OK(writable_file->Append(kGood)); + ASSERT_TRUE(writable_file->GetFileSize() == kGood.size()); + + std::string scratch; + scratch.resize(kGood.size() + kCorrupted.size() + 16); + Slice result; + std::unique_ptr rand_file; + ASSERT_OK(env_->NewRandomAccessFile(kFileName, &rand_file, soptions_)); + ASSERT_OK(rand_file->Read(0, kGood.size(), &result, &(scratch[0]))); + ASSERT_EQ(result.compare(kGood), 0); + + // Sync + corrupt => no change + ASSERT_OK(writable_file->Fsync()); + ASSERT_OK(dynamic_cast(env_)->CorruptBuffer(kFileName)); + result.clear(); + ASSERT_OK(rand_file->Read(0, kGood.size(), &result, &(scratch[0]))); + ASSERT_EQ(result.compare(kGood), 0); + + // Add new data and corrupt it + ASSERT_OK(writable_file->Append(kCorrupted)); + ASSERT_TRUE(writable_file->GetFileSize() == kGood.size() + kCorrupted.size()); + result.clear(); + ASSERT_OK(rand_file->Read(kGood.size(), kCorrupted.size(), + &result, &(scratch[0]))); + ASSERT_EQ(result.compare(kCorrupted), 0); + // Corrupted + ASSERT_OK(dynamic_cast(env_)->CorruptBuffer(kFileName)); + result.clear(); + ASSERT_OK(rand_file->Read(kGood.size(), kCorrupted.size(), + &result, &(scratch[0]))); + ASSERT_NE(result.compare(kCorrupted), 0); +} + +TEST_F(MockEnvTest, FakeSleeping) { + int64_t now = 0; + auto s = env_->GetCurrentTime(&now); + ASSERT_OK(s); + env_->FakeSleepForMicroseconds(3 * 1000 * 1000); + int64_t after_sleep = 0; + s = env_->GetCurrentTime(&after_sleep); + ASSERT_OK(s); + auto delta = after_sleep - now; + // this will be true unless test runs for 2 seconds + ASSERT_TRUE(delta == 3 || delta == 4); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/env/posix_logger.h b/src/rocksdb/env/posix_logger.h new file mode 100644 index 00000000..401df6a3 --- /dev/null +++ b/src/rocksdb/env/posix_logger.h @@ -0,0 +1,185 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#pragma once +#include +#include +#include "port/sys_time.h" +#include +#include + +#ifdef OS_LINUX +#ifndef FALLOC_FL_KEEP_SIZE +#include +#endif +#endif + +#include +#include "env/io_posix.h" +#include "monitoring/iostats_context_imp.h" +#include "rocksdb/env.h" +#include "util/sync_point.h" + +namespace rocksdb { + +class PosixLogger : public Logger { + private: + Status PosixCloseHelper() { + int ret; + + ret = fclose(file_); + if (ret) { + return IOError("Unable to close log file", "", ret); + } + return Status::OK(); + } + FILE* file_; + uint64_t (*gettid_)(); // Return the thread id for the current thread + std::atomic_size_t log_size_; + int fd_; + const static uint64_t flush_every_seconds_ = 5; + std::atomic_uint_fast64_t last_flush_micros_; + Env* env_; + std::atomic flush_pending_; + + protected: + virtual Status CloseImpl() override { return PosixCloseHelper(); } + + public: + PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, + const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) + : Logger(log_level), + file_(f), + gettid_(gettid), + log_size_(0), + fd_(fileno(f)), + last_flush_micros_(0), + env_(env), + flush_pending_(false) {} + virtual ~PosixLogger() { + if (!closed_) { + closed_ = true; + PosixCloseHelper(); + } + } + virtual void Flush() override { + TEST_SYNC_POINT("PosixLogger::Flush:Begin1"); + TEST_SYNC_POINT("PosixLogger::Flush:Begin2"); + if (flush_pending_) { + flush_pending_ = false; + fflush(file_); + } + last_flush_micros_ = env_->NowMicros(); + } + + using Logger::Logv; + virtual void Logv(const char* format, va_list ap) override { + IOSTATS_TIMER_GUARD(logger_nanos); + + const uint64_t thread_id = (*gettid_)(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 65536; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + p += snprintf(p, limit - p, + "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec, + static_cast(now_tv.tv_usec), + static_cast(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + const size_t write_size = p - base; + +#ifdef ROCKSDB_FALLOCATE_PRESENT + const int kDebugLogChunkSize = 128 * 1024; + + // If this write would cross a boundary of kDebugLogChunkSize + // space, pre-allocate more space to avoid overly large + // allocations from filesystem allocsize options. + const size_t log_size = log_size_; + const size_t last_allocation_chunk = + ((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize); + const size_t desired_allocation_chunk = + ((kDebugLogChunkSize - 1 + log_size + write_size) / + kDebugLogChunkSize); + if (last_allocation_chunk != desired_allocation_chunk) { + fallocate( + fd_, FALLOC_FL_KEEP_SIZE, 0, + static_cast(desired_allocation_chunk * kDebugLogChunkSize)); + } +#endif + + size_t sz = fwrite(base, 1, write_size, file_); + flush_pending_ = true; + if (sz > 0) { + log_size_ += write_size; + } + uint64_t now_micros = static_cast(now_tv.tv_sec) * 1000000 + + now_tv.tv_usec; + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + Flush(); + } + if (base != buffer) { + delete[] base; + } + break; + } + } + size_t GetLogFileSize() const override { return log_size_; } +}; + +} // namespace rocksdb -- cgit v1.2.3