summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/env
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/env/env.cc425
-rw-r--r--src/rocksdb/env/env_basic_test.cc354
-rw-r--r--src/rocksdb/env/env_chroot.cc321
-rw-r--r--src/rocksdb/env/env_chroot.h22
-rw-r--r--src/rocksdb/env/env_encryption.cc927
-rw-r--r--src/rocksdb/env/env_hdfs.cc627
-rw-r--r--src/rocksdb/env/env_posix.cc1128
-rw-r--r--src/rocksdb/env/env_test.cc1774
-rw-r--r--src/rocksdb/env/io_posix.cc1082
-rw-r--r--src/rocksdb/env/io_posix.h258
-rw-r--r--src/rocksdb/env/mock_env.cc775
-rw-r--r--src/rocksdb/env/mock_env.h114
-rw-r--r--src/rocksdb/env/mock_env_test.cc83
-rw-r--r--src/rocksdb/env/posix_logger.h185
14 files changed, 8075 insertions, 0 deletions
diff --git a/src/rocksdb/env/env.cc b/src/rocksdb/env/env.cc
new file mode 100644
index 00000000..fde03577
--- /dev/null
+++ b/src/rocksdb/env/env.cc
@@ -0,0 +1,425 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "rocksdb/env.h"
+
+#include <thread>
+#include "options/db_options.h"
+#include "port/port.h"
+#include "port/sys_time.h"
+#include "rocksdb/options.h"
+#include "util/arena.h"
+#include "util/autovector.h"
+
+namespace rocksdb {
+
+Env::~Env() {
+}
+
+std::string Env::PriorityToString(Env::Priority priority) {
+ switch (priority) {
+ case Env::Priority::BOTTOM:
+ return "Bottom";
+ case Env::Priority::LOW:
+ return "Low";
+ case Env::Priority::HIGH:
+ return "High";
+ case Env::Priority::USER:
+ return "User";
+ case Env::Priority::TOTAL:
+ assert(false);
+ }
+ return "Invalid";
+}
+
+uint64_t Env::GetThreadID() const {
+ std::hash<std::thread::id> hasher;
+ return hasher(std::this_thread::get_id());
+}
+
+Status Env::ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) {
+ Status s = RenameFile(old_fname, fname);
+ if (!s.ok()) {
+ return s;
+ }
+ return NewWritableFile(fname, result, options);
+}
+
+Status Env::GetChildrenFileAttributes(const std::string& dir,
+ std::vector<FileAttributes>* result) {
+ assert(result != nullptr);
+ std::vector<std::string> child_fnames;
+ Status s = GetChildren(dir, &child_fnames);
+ if (!s.ok()) {
+ return s;
+ }
+ result->resize(child_fnames.size());
+ size_t result_size = 0;
+ for (size_t i = 0; i < child_fnames.size(); ++i) {
+ const std::string path = dir + "/" + child_fnames[i];
+ if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
+ if (FileExists(path).IsNotFound()) {
+ // The file may have been deleted since we listed the directory
+ continue;
+ }
+ return s;
+ }
+ (*result)[result_size].name = std::move(child_fnames[i]);
+ result_size++;
+ }
+ result->resize(result_size);
+ return Status::OK();
+}
+
+SequentialFile::~SequentialFile() {
+}
+
+RandomAccessFile::~RandomAccessFile() {
+}
+
+WritableFile::~WritableFile() {
+}
+
+MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
+
+Logger::~Logger() {}
+
+Status Logger::Close() {
+ if (!closed_) {
+ closed_ = true;
+ return CloseImpl();
+ } else {
+ return Status::OK();
+ }
+}
+
+Status Logger::CloseImpl() { return Status::NotSupported(); }
+
+FileLock::~FileLock() {
+}
+
+void LogFlush(Logger *info_log) {
+ if (info_log) {
+ info_log->Flush();
+ }
+}
+
+static void Logv(Logger *info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
+ info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
+ }
+}
+
+void Log(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(info_log, format, ap);
+ va_end(ap);
+}
+
+void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
+ static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
+ "ERROR", "FATAL" };
+ if (log_level < log_level_) {
+ return;
+ }
+
+ if (log_level == InfoLogLevel::INFO_LEVEL) {
+ // Doesn't print log level if it is INFO level.
+ // This is to avoid unexpected performance regression after we add
+ // the feature of log level. All the logs before we add the feature
+ // are INFO level. We don't want to add extra costs to those existing
+ // logging.
+ Logv(format, ap);
+ } else if (log_level == InfoLogLevel::HEADER_LEVEL) {
+ LogHeader(format, ap);
+ } else {
+ char new_format[500];
+ snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
+ kInfoLogLevelNames[log_level], format);
+ Logv(new_format, ap);
+ }
+}
+
+static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= log_level) {
+ if (log_level == InfoLogLevel::HEADER_LEVEL) {
+ info_log->LogHeader(format, ap);
+ } else {
+ info_log->Logv(log_level, format, ap);
+ }
+ }
+}
+
+void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
+ ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(log_level, info_log, format, ap);
+ va_end(ap);
+}
+
+static void Headerv(Logger *info_log, const char *format, va_list ap) {
+ if (info_log) {
+ info_log->LogHeader(format, ap);
+ }
+}
+
+void Header(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Headerv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Debugv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
+ info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
+ }
+}
+
+void Debug(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Debugv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Infov(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
+ info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
+ }
+}
+
+void Info(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Infov(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Warnv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
+ info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
+ }
+}
+
+void Warn(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Warnv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Errorv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
+ info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
+ }
+}
+
+void Error(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Errorv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Fatalv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
+ info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
+ }
+}
+
+void Fatal(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Fatalv(info_log, format, ap);
+ va_end(ap);
+}
+
+void LogFlush(const std::shared_ptr<Logger>& info_log) {
+ LogFlush(info_log.get());
+}
+
+void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
+ const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(log_level, info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Headerv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Debugv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Infov(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Warnv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Errorv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Fatalv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
+ bool should_sync) {
+ std::unique_ptr<WritableFile> file;
+ EnvOptions soptions;
+ Status s = env->NewWritableFile(fname, &file, soptions);
+ if (!s.ok()) {
+ return s;
+ }
+ s = file->Append(data);
+ if (s.ok() && should_sync) {
+ s = file->Sync();
+ }
+ if (!s.ok()) {
+ env->DeleteFile(fname);
+ }
+ return s;
+}
+
+Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
+ EnvOptions soptions;
+ data->clear();
+ std::unique_ptr<SequentialFile> file;
+ Status s = env->NewSequentialFile(fname, &file, soptions);
+ if (!s.ok()) {
+ return s;
+ }
+ static const int kBufferSize = 8192;
+ char* space = new char[kBufferSize];
+ while (true) {
+ Slice fragment;
+ s = file->Read(kBufferSize, &fragment, space);
+ if (!s.ok()) {
+ break;
+ }
+ data->append(fragment.data(), fragment.size());
+ if (fragment.empty()) {
+ break;
+ }
+ }
+ delete[] space;
+ return s;
+}
+
+EnvWrapper::~EnvWrapper() {
+}
+
+namespace { // anonymous namespace
+
+void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
+ env_options->use_mmap_reads = options.allow_mmap_reads;
+ env_options->use_mmap_writes = options.allow_mmap_writes;
+ env_options->use_direct_reads = options.use_direct_reads;
+ env_options->set_fd_cloexec = options.is_fd_close_on_exec;
+ env_options->bytes_per_sync = options.bytes_per_sync;
+ env_options->compaction_readahead_size = options.compaction_readahead_size;
+ env_options->random_access_max_buffer_size =
+ options.random_access_max_buffer_size;
+ env_options->rate_limiter = options.rate_limiter.get();
+ env_options->writable_file_max_buffer_size =
+ options.writable_file_max_buffer_size;
+ env_options->allow_fallocate = options.allow_fallocate;
+}
+
+}
+
+EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
+ const DBOptions& db_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
+ optimized_env_options.writable_file_max_buffer_size =
+ db_options.writable_file_max_buffer_size;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
+ return env_options;
+}
+
+EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_reads = false;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_reads = false;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForCompactionTableWrite(
+ const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_writes =
+ db_options.use_direct_io_for_flush_and_compaction;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForCompactionTableRead(
+ const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_reads = db_options.use_direct_reads;
+ return optimized_env_options;
+}
+
+EnvOptions::EnvOptions(const DBOptions& options) {
+ AssignEnvOptions(this, options);
+}
+
+EnvOptions::EnvOptions() {
+ DBOptions options;
+ AssignEnvOptions(this, options);
+}
+
+
+} // namespace rocksdb
diff --git a/src/rocksdb/env/env_basic_test.cc b/src/rocksdb/env/env_basic_test.cc
new file mode 100644
index 00000000..3efae758a
--- /dev/null
+++ b/src/rocksdb/env/env_basic_test.cc
@@ -0,0 +1,354 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include <memory>
+#include <string>
+#include <vector>
+#include <algorithm>
+
+#include "env/mock_env.h"
+#include "rocksdb/env.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "util/testharness.h"
+
+namespace rocksdb {
+
+// Normalizes trivial differences across Envs such that these test cases can
+// run on all Envs.
+class NormalizingEnvWrapper : public EnvWrapper {
+ public:
+ explicit NormalizingEnvWrapper(Env* base) : EnvWrapper(base) {}
+
+ // Removes . and .. from directory listing
+ Status GetChildren(const std::string& dir,
+ std::vector<std::string>* result) override {
+ Status status = EnvWrapper::GetChildren(dir, result);
+ if (status.ok()) {
+ result->erase(std::remove_if(result->begin(), result->end(),
+ [](const std::string& s) {
+ return s == "." || s == "..";
+ }),
+ result->end());
+ }
+ return status;
+ }
+
+ // Removes . and .. from directory listing
+ Status GetChildrenFileAttributes(
+ const std::string& dir, std::vector<FileAttributes>* result) override {
+ Status status = EnvWrapper::GetChildrenFileAttributes(dir, result);
+ if (status.ok()) {
+ result->erase(std::remove_if(result->begin(), result->end(),
+ [](const FileAttributes& fa) {
+ return fa.name == "." || fa.name == "..";
+ }),
+ result->end());
+ }
+ return status;
+ }
+};
+
+class EnvBasicTestWithParam : public testing::Test,
+ public ::testing::WithParamInterface<Env*> {
+ public:
+ Env* env_;
+ const EnvOptions soptions_;
+ std::string test_dir_;
+
+ EnvBasicTestWithParam() : env_(GetParam()) {
+ test_dir_ = test::PerThreadDBPath(env_, "env_basic_test");
+ }
+
+ void SetUp() override { env_->CreateDirIfMissing(test_dir_); }
+
+ void TearDown() override {
+ std::vector<std::string> files;
+ env_->GetChildren(test_dir_, &files);
+ for (const auto& file : files) {
+ // don't know whether it's file or directory, try both. The tests must
+ // only create files or empty directories, so one must succeed, else the
+ // directory's corrupted.
+ Status s = env_->DeleteFile(test_dir_ + "/" + file);
+ if (!s.ok()) {
+ ASSERT_OK(env_->DeleteDir(test_dir_ + "/" + file));
+ }
+ }
+ }
+};
+
+class EnvMoreTestWithParam : public EnvBasicTestWithParam {};
+
+static std::unique_ptr<Env> def_env(new NormalizingEnvWrapper(Env::Default()));
+INSTANTIATE_TEST_CASE_P(EnvDefault, EnvBasicTestWithParam,
+ ::testing::Values(def_env.get()));
+INSTANTIATE_TEST_CASE_P(EnvDefault, EnvMoreTestWithParam,
+ ::testing::Values(def_env.get()));
+
+static std::unique_ptr<Env> mock_env(new MockEnv(Env::Default()));
+INSTANTIATE_TEST_CASE_P(MockEnv, EnvBasicTestWithParam,
+ ::testing::Values(mock_env.get()));
+#ifndef ROCKSDB_LITE
+static std::unique_ptr<Env> mem_env(NewMemEnv(Env::Default()));
+INSTANTIATE_TEST_CASE_P(MemEnv, EnvBasicTestWithParam,
+ ::testing::Values(mem_env.get()));
+
+namespace {
+
+// Returns a vector of 0 or 1 Env*, depending whether an Env is registered for
+// TEST_ENV_URI.
+//
+// The purpose of returning an empty vector (instead of nullptr) is that gtest
+// ValuesIn() will skip running tests when given an empty collection.
+std::vector<Env*> GetCustomEnvs() {
+ static Env* custom_env;
+ static std::unique_ptr<Env> custom_env_guard;
+ static bool init = false;
+ if (!init) {
+ init = true;
+ const char* uri = getenv("TEST_ENV_URI");
+ if (uri != nullptr) {
+ custom_env = NewCustomObject<Env>(uri, &custom_env_guard);
+ }
+ }
+
+ std::vector<Env*> res;
+ if (custom_env != nullptr) {
+ res.emplace_back(custom_env);
+ }
+ return res;
+}
+
+} // anonymous namespace
+
+INSTANTIATE_TEST_CASE_P(CustomEnv, EnvBasicTestWithParam,
+ ::testing::ValuesIn(GetCustomEnvs()));
+
+INSTANTIATE_TEST_CASE_P(CustomEnv, EnvMoreTestWithParam,
+ ::testing::ValuesIn(GetCustomEnvs()));
+
+#endif // ROCKSDB_LITE
+
+TEST_P(EnvBasicTestWithParam, Basics) {
+ uint64_t file_size;
+ std::unique_ptr<WritableFile> writable_file;
+ std::vector<std::string> children;
+
+ // Check that the directory is empty.
+ ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/non_existent"));
+ ASSERT_TRUE(!env_->GetFileSize(test_dir_ + "/non_existent", &file_size).ok());
+ ASSERT_OK(env_->GetChildren(test_dir_, &children));
+ ASSERT_EQ(0U, children.size());
+
+ // Create a file.
+ ASSERT_OK(env_->NewWritableFile(test_dir_ + "/f", &writable_file, soptions_));
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+
+ // Check that the file exists.
+ ASSERT_OK(env_->FileExists(test_dir_ + "/f"));
+ ASSERT_OK(env_->GetFileSize(test_dir_ + "/f", &file_size));
+ ASSERT_EQ(0U, file_size);
+ ASSERT_OK(env_->GetChildren(test_dir_, &children));
+ ASSERT_EQ(1U, children.size());
+ ASSERT_EQ("f", children[0]);
+ ASSERT_OK(env_->DeleteFile(test_dir_ + "/f"));
+
+ // Write to the file.
+ ASSERT_OK(
+ env_->NewWritableFile(test_dir_ + "/f1", &writable_file, soptions_));
+ ASSERT_OK(writable_file->Append("abc"));
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+ ASSERT_OK(
+ env_->NewWritableFile(test_dir_ + "/f2", &writable_file, soptions_));
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+
+ // Check for expected size.
+ ASSERT_OK(env_->GetFileSize(test_dir_ + "/f1", &file_size));
+ ASSERT_EQ(3U, file_size);
+
+ // Check that renaming works.
+ ASSERT_TRUE(
+ !env_->RenameFile(test_dir_ + "/non_existent", test_dir_ + "/g").ok());
+ ASSERT_OK(env_->RenameFile(test_dir_ + "/f1", test_dir_ + "/g"));
+ ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/f1"));
+ ASSERT_OK(env_->FileExists(test_dir_ + "/g"));
+ ASSERT_OK(env_->GetFileSize(test_dir_ + "/g", &file_size));
+ ASSERT_EQ(3U, file_size);
+
+ // Check that renaming overwriting works
+ ASSERT_OK(env_->RenameFile(test_dir_ + "/f2", test_dir_ + "/g"));
+ ASSERT_OK(env_->GetFileSize(test_dir_ + "/g", &file_size));
+ ASSERT_EQ(0U, file_size);
+
+ // Check that opening non-existent file fails.
+ std::unique_ptr<SequentialFile> seq_file;
+ std::unique_ptr<RandomAccessFile> rand_file;
+ ASSERT_TRUE(!env_->NewSequentialFile(test_dir_ + "/non_existent", &seq_file,
+ soptions_)
+ .ok());
+ ASSERT_TRUE(!seq_file);
+ ASSERT_TRUE(!env_->NewRandomAccessFile(test_dir_ + "/non_existent",
+ &rand_file, soptions_)
+ .ok());
+ ASSERT_TRUE(!rand_file);
+
+ // Check that deleting works.
+ ASSERT_TRUE(!env_->DeleteFile(test_dir_ + "/non_existent").ok());
+ ASSERT_OK(env_->DeleteFile(test_dir_ + "/g"));
+ ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/g"));
+ ASSERT_OK(env_->GetChildren(test_dir_, &children));
+ ASSERT_EQ(0U, children.size());
+ ASSERT_TRUE(
+ env_->GetChildren(test_dir_ + "/non_existent", &children).IsNotFound());
+}
+
+TEST_P(EnvBasicTestWithParam, ReadWrite) {
+ std::unique_ptr<WritableFile> writable_file;
+ std::unique_ptr<SequentialFile> seq_file;
+ std::unique_ptr<RandomAccessFile> rand_file;
+ Slice result;
+ char scratch[100];
+
+ ASSERT_OK(env_->NewWritableFile(test_dir_ + "/f", &writable_file, soptions_));
+ ASSERT_OK(writable_file->Append("hello "));
+ ASSERT_OK(writable_file->Append("world"));
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+
+ // Read sequentially.
+ ASSERT_OK(env_->NewSequentialFile(test_dir_ + "/f", &seq_file, soptions_));
+ ASSERT_OK(seq_file->Read(5, &result, scratch)); // Read "hello".
+ ASSERT_EQ(0, result.compare("hello"));
+ ASSERT_OK(seq_file->Skip(1));
+ ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Read "world".
+ ASSERT_EQ(0, result.compare("world"));
+ ASSERT_OK(seq_file->Read(1000, &result, scratch)); // Try reading past EOF.
+ ASSERT_EQ(0U, result.size());
+ ASSERT_OK(seq_file->Skip(100)); // Try to skip past end of file.
+ ASSERT_OK(seq_file->Read(1000, &result, scratch));
+ ASSERT_EQ(0U, result.size());
+
+ // Random reads.
+ ASSERT_OK(env_->NewRandomAccessFile(test_dir_ + "/f", &rand_file, soptions_));
+ ASSERT_OK(rand_file->Read(6, 5, &result, scratch)); // Read "world".
+ ASSERT_EQ(0, result.compare("world"));
+ ASSERT_OK(rand_file->Read(0, 5, &result, scratch)); // Read "hello".
+ ASSERT_EQ(0, result.compare("hello"));
+ ASSERT_OK(rand_file->Read(10, 100, &result, scratch)); // Read "d".
+ ASSERT_EQ(0, result.compare("d"));
+
+ // Too high offset.
+ ASSERT_TRUE(rand_file->Read(1000, 5, &result, scratch).ok());
+}
+
+TEST_P(EnvBasicTestWithParam, Misc) {
+ std::unique_ptr<WritableFile> writable_file;
+ ASSERT_OK(env_->NewWritableFile(test_dir_ + "/b", &writable_file, soptions_));
+
+ // These are no-ops, but we test they return success.
+ ASSERT_OK(writable_file->Sync());
+ ASSERT_OK(writable_file->Flush());
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+}
+
+TEST_P(EnvBasicTestWithParam, LargeWrite) {
+ const size_t kWriteSize = 300 * 1024;
+ char* scratch = new char[kWriteSize * 2];
+
+ std::string write_data;
+ for (size_t i = 0; i < kWriteSize; ++i) {
+ write_data.append(1, static_cast<char>(i));
+ }
+
+ std::unique_ptr<WritableFile> writable_file;
+ ASSERT_OK(env_->NewWritableFile(test_dir_ + "/f", &writable_file, soptions_));
+ ASSERT_OK(writable_file->Append("foo"));
+ ASSERT_OK(writable_file->Append(write_data));
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+
+ std::unique_ptr<SequentialFile> seq_file;
+ Slice result;
+ ASSERT_OK(env_->NewSequentialFile(test_dir_ + "/f", &seq_file, soptions_));
+ ASSERT_OK(seq_file->Read(3, &result, scratch)); // Read "foo".
+ ASSERT_EQ(0, result.compare("foo"));
+
+ size_t read = 0;
+ std::string read_data;
+ while (read < kWriteSize) {
+ ASSERT_OK(seq_file->Read(kWriteSize - read, &result, scratch));
+ read_data.append(result.data(), result.size());
+ read += result.size();
+ }
+ ASSERT_TRUE(write_data == read_data);
+ delete [] scratch;
+}
+
+TEST_P(EnvMoreTestWithParam, GetModTime) {
+ ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/dir1"));
+ uint64_t mtime1 = 0x0;
+ ASSERT_OK(env_->GetFileModificationTime(test_dir_ + "/dir1", &mtime1));
+}
+
+TEST_P(EnvMoreTestWithParam, MakeDir) {
+ ASSERT_OK(env_->CreateDir(test_dir_ + "/j"));
+ ASSERT_OK(env_->FileExists(test_dir_ + "/j"));
+ std::vector<std::string> children;
+ env_->GetChildren(test_dir_, &children);
+ ASSERT_EQ(1U, children.size());
+ // fail because file already exists
+ ASSERT_TRUE(!env_->CreateDir(test_dir_ + "/j").ok());
+ ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/j"));
+ ASSERT_OK(env_->DeleteDir(test_dir_ + "/j"));
+ ASSERT_EQ(Status::NotFound(), env_->FileExists(test_dir_ + "/j"));
+}
+
+TEST_P(EnvMoreTestWithParam, GetChildren) {
+ // empty folder returns empty vector
+ std::vector<std::string> children;
+ std::vector<Env::FileAttributes> childAttr;
+ ASSERT_OK(env_->CreateDirIfMissing(test_dir_));
+ ASSERT_OK(env_->GetChildren(test_dir_, &children));
+ ASSERT_OK(env_->FileExists(test_dir_));
+ ASSERT_OK(env_->GetChildrenFileAttributes(test_dir_, &childAttr));
+ ASSERT_EQ(0U, children.size());
+ ASSERT_EQ(0U, childAttr.size());
+
+ // folder with contents returns relative path to test dir
+ ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/niu"));
+ ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/you"));
+ ASSERT_OK(env_->CreateDirIfMissing(test_dir_ + "/guo"));
+ ASSERT_OK(env_->GetChildren(test_dir_, &children));
+ ASSERT_OK(env_->GetChildrenFileAttributes(test_dir_, &childAttr));
+ ASSERT_EQ(3U, children.size());
+ ASSERT_EQ(3U, childAttr.size());
+ for (auto each : children) {
+ env_->DeleteDir(test_dir_ + "/" + each);
+ } // necessary for default POSIX env
+
+ // non-exist directory returns IOError
+ ASSERT_OK(env_->DeleteDir(test_dir_));
+ ASSERT_TRUE(!env_->FileExists(test_dir_).ok());
+ ASSERT_TRUE(!env_->GetChildren(test_dir_, &children).ok());
+ ASSERT_TRUE(!env_->GetChildrenFileAttributes(test_dir_, &childAttr).ok());
+
+ // if dir is a file, returns IOError
+ ASSERT_OK(env_->CreateDir(test_dir_));
+ std::unique_ptr<WritableFile> writable_file;
+ ASSERT_OK(
+ env_->NewWritableFile(test_dir_ + "/file", &writable_file, soptions_));
+ ASSERT_OK(writable_file->Close());
+ writable_file.reset();
+ ASSERT_TRUE(!env_->GetChildren(test_dir_ + "/file", &children).ok());
+ ASSERT_EQ(0U, children.size());
+}
+
+} // namespace rocksdb
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/env/env_chroot.cc b/src/rocksdb/env/env_chroot.cc
new file mode 100644
index 00000000..8a7fb449
--- /dev/null
+++ b/src/rocksdb/env/env_chroot.cc
@@ -0,0 +1,321 @@
+// Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
+
+#include "env/env_chroot.h"
+
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "rocksdb/status.h"
+
+namespace rocksdb {
+
+class ChrootEnv : public EnvWrapper {
+ public:
+ ChrootEnv(Env* base_env, const std::string& chroot_dir)
+ : EnvWrapper(base_env) {
+#if defined(OS_AIX)
+ char resolvedName[PATH_MAX];
+ char* real_chroot_dir = realpath(chroot_dir.c_str(), resolvedName);
+#else
+ char* real_chroot_dir = realpath(chroot_dir.c_str(), nullptr);
+#endif
+ // chroot_dir must exist so realpath() returns non-nullptr.
+ assert(real_chroot_dir != nullptr);
+ chroot_dir_ = real_chroot_dir;
+#if !defined(OS_AIX)
+ free(real_chroot_dir);
+#endif
+ }
+
+ Status NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& options) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::NewSequentialFile(status_and_enc_path.second, result,
+ options);
+ }
+
+ Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& options) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::NewRandomAccessFile(status_and_enc_path.second, result,
+ options);
+ }
+
+ Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::NewWritableFile(status_and_enc_path.second, result,
+ options);
+ }
+
+ Status ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ auto status_and_old_enc_path = EncodePath(old_fname);
+ if (!status_and_old_enc_path.first.ok()) {
+ return status_and_old_enc_path.first;
+ }
+ return EnvWrapper::ReuseWritableFile(status_and_old_enc_path.second,
+ status_and_old_enc_path.second, result,
+ options);
+ }
+
+ Status NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& options) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::NewRandomRWFile(status_and_enc_path.second, result,
+ options);
+ }
+
+ Status NewDirectory(const std::string& dir,
+ std::unique_ptr<Directory>* result) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(dir);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::NewDirectory(status_and_enc_path.second, result);
+ }
+
+ Status FileExists(const std::string& fname) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::FileExists(status_and_enc_path.second);
+ }
+
+ Status GetChildren(const std::string& dir,
+ std::vector<std::string>* result) override {
+ auto status_and_enc_path = EncodePath(dir);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::GetChildren(status_and_enc_path.second, result);
+ }
+
+ Status GetChildrenFileAttributes(
+ const std::string& dir, std::vector<FileAttributes>* result) override {
+ auto status_and_enc_path = EncodePath(dir);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::GetChildrenFileAttributes(status_and_enc_path.second,
+ result);
+ }
+
+ Status DeleteFile(const std::string& fname) override {
+ auto status_and_enc_path = EncodePath(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::DeleteFile(status_and_enc_path.second);
+ }
+
+ Status CreateDir(const std::string& dirname) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(dirname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::CreateDir(status_and_enc_path.second);
+ }
+
+ Status CreateDirIfMissing(const std::string& dirname) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(dirname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::CreateDirIfMissing(status_and_enc_path.second);
+ }
+
+ Status DeleteDir(const std::string& dirname) override {
+ auto status_and_enc_path = EncodePath(dirname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::DeleteDir(status_and_enc_path.second);
+ }
+
+ Status GetFileSize(const std::string& fname, uint64_t* file_size) override {
+ auto status_and_enc_path = EncodePath(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::GetFileSize(status_and_enc_path.second, file_size);
+ }
+
+ Status GetFileModificationTime(const std::string& fname,
+ uint64_t* file_mtime) override {
+ auto status_and_enc_path = EncodePath(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::GetFileModificationTime(status_and_enc_path.second,
+ file_mtime);
+ }
+
+ Status RenameFile(const std::string& src, const std::string& dest) override {
+ auto status_and_src_enc_path = EncodePath(src);
+ if (!status_and_src_enc_path.first.ok()) {
+ return status_and_src_enc_path.first;
+ }
+ auto status_and_dest_enc_path = EncodePathWithNewBasename(dest);
+ if (!status_and_dest_enc_path.first.ok()) {
+ return status_and_dest_enc_path.first;
+ }
+ return EnvWrapper::RenameFile(status_and_src_enc_path.second,
+ status_and_dest_enc_path.second);
+ }
+
+ Status LinkFile(const std::string& src, const std::string& dest) override {
+ auto status_and_src_enc_path = EncodePath(src);
+ if (!status_and_src_enc_path.first.ok()) {
+ return status_and_src_enc_path.first;
+ }
+ auto status_and_dest_enc_path = EncodePathWithNewBasename(dest);
+ if (!status_and_dest_enc_path.first.ok()) {
+ return status_and_dest_enc_path.first;
+ }
+ return EnvWrapper::LinkFile(status_and_src_enc_path.second,
+ status_and_dest_enc_path.second);
+ }
+
+ Status LockFile(const std::string& fname, FileLock** lock) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ // FileLock subclasses may store path (e.g., PosixFileLock stores it). We
+ // can skip stripping the chroot directory from this path because callers
+ // shouldn't use it.
+ return EnvWrapper::LockFile(status_and_enc_path.second, lock);
+ }
+
+ Status GetTestDirectory(std::string* path) override {
+ // Adapted from PosixEnv's implementation since it doesn't provide a way to
+ // create directory in the chroot.
+ char buf[256];
+ snprintf(buf, sizeof(buf), "/rocksdbtest-%d", static_cast<int>(geteuid()));
+ *path = buf;
+
+ // Directory may already exist, so ignore return
+ CreateDir(*path);
+ return Status::OK();
+ }
+
+ Status NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) override {
+ auto status_and_enc_path = EncodePathWithNewBasename(fname);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::NewLogger(status_and_enc_path.second, result);
+ }
+
+ Status GetAbsolutePath(const std::string& db_path,
+ std::string* output_path) override {
+ auto status_and_enc_path = EncodePath(db_path);
+ if (!status_and_enc_path.first.ok()) {
+ return status_and_enc_path.first;
+ }
+ return EnvWrapper::GetAbsolutePath(status_and_enc_path.second, output_path);
+ }
+
+ private:
+ // Returns status and expanded absolute path including the chroot directory.
+ // Checks whether the provided path breaks out of the chroot. If it returns
+ // non-OK status, the returned path should not be used.
+ std::pair<Status, std::string> EncodePath(const std::string& path) {
+ if (path.empty() || path[0] != '/') {
+ return {Status::InvalidArgument(path, "Not an absolute path"), ""};
+ }
+ std::pair<Status, std::string> res;
+ res.second = chroot_dir_ + path;
+#if defined(OS_AIX)
+ char resolvedName[PATH_MAX];
+ char* normalized_path = realpath(res.second.c_str(), resolvedName);
+#else
+ char* normalized_path = realpath(res.second.c_str(), nullptr);
+#endif
+ if (normalized_path == nullptr) {
+ res.first = Status::NotFound(res.second, strerror(errno));
+ } else if (strlen(normalized_path) < chroot_dir_.size() ||
+ strncmp(normalized_path, chroot_dir_.c_str(),
+ chroot_dir_.size()) != 0) {
+ res.first = Status::IOError(res.second,
+ "Attempted to access path outside chroot");
+ } else {
+ res.first = Status::OK();
+ }
+#if !defined(OS_AIX)
+ free(normalized_path);
+#endif
+ return res;
+ }
+
+ // Similar to EncodePath() except assumes the basename in the path hasn't been
+ // created yet.
+ std::pair<Status, std::string> EncodePathWithNewBasename(
+ const std::string& path) {
+ if (path.empty() || path[0] != '/') {
+ return {Status::InvalidArgument(path, "Not an absolute path"), ""};
+ }
+ // Basename may be followed by trailing slashes
+ size_t final_idx = path.find_last_not_of('/');
+ if (final_idx == std::string::npos) {
+ // It's only slashes so no basename to extract
+ return EncodePath(path);
+ }
+
+ // Pull off the basename temporarily since realname(3) (used by
+ // EncodePath()) requires a path that exists
+ size_t base_sep = path.rfind('/', final_idx);
+ auto status_and_enc_path = EncodePath(path.substr(0, base_sep + 1));
+ status_and_enc_path.second.append(path.substr(base_sep + 1));
+ return status_and_enc_path;
+ }
+
+ std::string chroot_dir_;
+};
+
+Env* NewChrootEnv(Env* base_env, const std::string& chroot_dir) {
+ if (!base_env->FileExists(chroot_dir).ok()) {
+ return nullptr;
+ }
+ return new ChrootEnv(base_env, chroot_dir);
+}
+
+} // namespace rocksdb
+
+#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
diff --git a/src/rocksdb/env/env_chroot.h b/src/rocksdb/env/env_chroot.h
new file mode 100644
index 00000000..b2760bc0
--- /dev/null
+++ b/src/rocksdb/env/env_chroot.h
@@ -0,0 +1,22 @@
+// Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#pragma once
+
+#if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
+
+#include <string>
+
+#include "rocksdb/env.h"
+
+namespace rocksdb {
+
+// Returns an Env that translates paths such that the root directory appears to
+// be chroot_dir. chroot_dir should refer to an existing directory.
+Env* NewChrootEnv(Env* base_env, const std::string& chroot_dir);
+
+} // namespace rocksdb
+
+#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
diff --git a/src/rocksdb/env/env_encryption.cc b/src/rocksdb/env/env_encryption.cc
new file mode 100644
index 00000000..aa59e663
--- /dev/null
+++ b/src/rocksdb/env/env_encryption.cc
@@ -0,0 +1,927 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+
+#ifndef ROCKSDB_LITE
+
+#include <algorithm>
+#include <cctype>
+#include <iostream>
+#include <cassert>
+
+#include "rocksdb/env_encryption.h"
+#include "util/aligned_buffer.h"
+#include "util/coding.h"
+#include "util/random.h"
+
+#endif
+
+namespace rocksdb {
+
+#ifndef ROCKSDB_LITE
+
+class EncryptedSequentialFile : public SequentialFile {
+ private:
+ std::unique_ptr<SequentialFile> file_;
+ std::unique_ptr<BlockAccessCipherStream> stream_;
+ uint64_t offset_;
+ size_t prefixLength_;
+
+ public:
+ // Default ctor. Given underlying sequential file is supposed to be at
+ // offset == prefixLength.
+ EncryptedSequentialFile(SequentialFile* f, BlockAccessCipherStream* s, size_t prefixLength)
+ : file_(f), stream_(s), offset_(prefixLength), prefixLength_(prefixLength) {
+ }
+
+ // Read up to "n" bytes from the file. "scratch[0..n-1]" may be
+ // written by this routine. Sets "*result" to the data that was
+ // read (including if fewer than "n" bytes were successfully read).
+ // May set "*result" to point at data in "scratch[0..n-1]", so
+ // "scratch[0..n-1]" must be live when "*result" is used.
+ // If an error was encountered, returns a non-OK status.
+ //
+ // REQUIRES: External synchronization
+ Status Read(size_t n, Slice* result, char* scratch) override {
+ assert(scratch);
+ Status status = file_->Read(n, result, scratch);
+ if (!status.ok()) {
+ return status;
+ }
+ status = stream_->Decrypt(offset_, (char*)result->data(), result->size());
+ offset_ += result->size(); // We've already ready data from disk, so update offset_ even if decryption fails.
+ return status;
+ }
+
+ // Skip "n" bytes from the file. This is guaranteed to be no
+ // slower that reading the same data, but may be faster.
+ //
+ // If end of file is reached, skipping will stop at the end of the
+ // file, and Skip will return OK.
+ //
+ // REQUIRES: External synchronization
+ Status Skip(uint64_t n) override {
+ auto status = file_->Skip(n);
+ if (!status.ok()) {
+ return status;
+ }
+ offset_ += n;
+ return status;
+ }
+
+ // Indicates the upper layers if the current SequentialFile implementation
+ // uses direct IO.
+ bool use_direct_io() const override { return file_->use_direct_io(); }
+
+ // Use the returned alignment value to allocate
+ // aligned buffer for Direct I/O
+ size_t GetRequiredBufferAlignment() const override {
+ return file_->GetRequiredBufferAlignment();
+ }
+
+ // Remove any kind of caching of data from the offset to offset+length
+ // of this file. If the length is 0, then it refers to the end of file.
+ // If the system is not caching the file contents, then this is a noop.
+ Status InvalidateCache(size_t offset, size_t length) override {
+ return file_->InvalidateCache(offset + prefixLength_, length);
+ }
+
+ // Positioned Read for direct I/O
+ // If Direct I/O enabled, offset, n, and scratch should be properly aligned
+ Status PositionedRead(uint64_t offset, size_t n, Slice* result,
+ char* scratch) override {
+ assert(scratch);
+ offset += prefixLength_; // Skip prefix
+ auto status = file_->PositionedRead(offset, n, result, scratch);
+ if (!status.ok()) {
+ return status;
+ }
+ offset_ = offset + result->size();
+ status = stream_->Decrypt(offset, (char*)result->data(), result->size());
+ return status;
+ }
+};
+
+// A file abstraction for randomly reading the contents of a file.
+class EncryptedRandomAccessFile : public RandomAccessFile {
+ private:
+ std::unique_ptr<RandomAccessFile> file_;
+ std::unique_ptr<BlockAccessCipherStream> stream_;
+ size_t prefixLength_;
+
+ public:
+ EncryptedRandomAccessFile(RandomAccessFile* f, BlockAccessCipherStream* s, size_t prefixLength)
+ : file_(f), stream_(s), prefixLength_(prefixLength) { }
+
+ // Read up to "n" bytes from the file starting at "offset".
+ // "scratch[0..n-1]" may be written by this routine. Sets "*result"
+ // to the data that was read (including if fewer than "n" bytes were
+ // successfully read). May set "*result" to point at data in
+ // "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
+ // "*result" is used. If an error was encountered, returns a non-OK
+ // status.
+ //
+ // Safe for concurrent use by multiple threads.
+ // If Direct I/O enabled, offset, n, and scratch should be aligned properly.
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ assert(scratch);
+ offset += prefixLength_;
+ auto status = file_->Read(offset, n, result, scratch);
+ if (!status.ok()) {
+ return status;
+ }
+ status = stream_->Decrypt(offset, (char*)result->data(), result->size());
+ return status;
+ }
+
+ // Readahead the file starting from offset by n bytes for caching.
+ Status Prefetch(uint64_t offset, size_t n) override {
+ //return Status::OK();
+ return file_->Prefetch(offset + prefixLength_, n);
+ }
+
+ // Tries to get an unique ID for this file that will be the same each time
+ // the file is opened (and will stay the same while the file is open).
+ // Furthermore, it tries to make this ID at most "max_size" bytes. If such an
+ // ID can be created this function returns the length of the ID and places it
+ // in "id"; otherwise, this function returns 0, in which case "id"
+ // may not have been modified.
+ //
+ // This function guarantees, for IDs from a given environment, two unique ids
+ // cannot be made equal to each other by adding arbitrary bytes to one of
+ // them. That is, no unique ID is the prefix of another.
+ //
+ // This function guarantees that the returned ID will not be interpretable as
+ // a single varint.
+ //
+ // Note: these IDs are only valid for the duration of the process.
+ size_t GetUniqueId(char* id, size_t max_size) const override {
+ return file_->GetUniqueId(id, max_size);
+ };
+
+ void Hint(AccessPattern pattern) override { file_->Hint(pattern); }
+
+ // Indicates the upper layers if the current RandomAccessFile implementation
+ // uses direct IO.
+ bool use_direct_io() const override { return file_->use_direct_io(); }
+
+ // Use the returned alignment value to allocate
+ // aligned buffer for Direct I/O
+ size_t GetRequiredBufferAlignment() const override {
+ return file_->GetRequiredBufferAlignment();
+ }
+
+ // Remove any kind of caching of data from the offset to offset+length
+ // of this file. If the length is 0, then it refers to the end of file.
+ // If the system is not caching the file contents, then this is a noop.
+ Status InvalidateCache(size_t offset, size_t length) override {
+ return file_->InvalidateCache(offset + prefixLength_, length);
+ }
+};
+
+// A file abstraction for sequential writing. The implementation
+// must provide buffering since callers may append small fragments
+// at a time to the file.
+class EncryptedWritableFile : public WritableFileWrapper {
+ private:
+ std::unique_ptr<WritableFile> file_;
+ std::unique_ptr<BlockAccessCipherStream> stream_;
+ size_t prefixLength_;
+
+ public:
+ // Default ctor. Prefix is assumed to be written already.
+ EncryptedWritableFile(WritableFile* f, BlockAccessCipherStream* s, size_t prefixLength)
+ : WritableFileWrapper(f), file_(f), stream_(s), prefixLength_(prefixLength) { }
+
+ Status Append(const Slice& data) override {
+ AlignedBuffer buf;
+ Status status;
+ Slice dataToAppend(data);
+ if (data.size() > 0) {
+ auto offset = file_->GetFileSize(); // size including prefix
+ // Encrypt in cloned buffer
+ buf.Alignment(GetRequiredBufferAlignment());
+ buf.AllocateNewBuffer(data.size());
+ memmove(buf.BufferStart(), data.data(), data.size());
+ status = stream_->Encrypt(offset, buf.BufferStart(), data.size());
+ if (!status.ok()) {
+ return status;
+ }
+ dataToAppend = Slice(buf.BufferStart(), data.size());
+ }
+ status = file_->Append(dataToAppend);
+ if (!status.ok()) {
+ return status;
+ }
+ return status;
+ }
+
+ Status PositionedAppend(const Slice& data, uint64_t offset) override {
+ AlignedBuffer buf;
+ Status status;
+ Slice dataToAppend(data);
+ offset += prefixLength_;
+ if (data.size() > 0) {
+ // Encrypt in cloned buffer
+ buf.Alignment(GetRequiredBufferAlignment());
+ buf.AllocateNewBuffer(data.size());
+ memmove(buf.BufferStart(), data.data(), data.size());
+ status = stream_->Encrypt(offset, buf.BufferStart(), data.size());
+ if (!status.ok()) {
+ return status;
+ }
+ dataToAppend = Slice(buf.BufferStart(), data.size());
+ }
+ status = file_->PositionedAppend(dataToAppend, offset);
+ if (!status.ok()) {
+ return status;
+ }
+ return status;
+ }
+
+ // Indicates the upper layers if the current WritableFile implementation
+ // uses direct IO.
+ bool use_direct_io() const override { return file_->use_direct_io(); }
+
+ // Use the returned alignment value to allocate
+ // aligned buffer for Direct I/O
+ size_t GetRequiredBufferAlignment() const override {
+ return file_->GetRequiredBufferAlignment();
+ }
+
+ /*
+ * Get the size of valid data in the file.
+ */
+ uint64_t GetFileSize() override {
+ return file_->GetFileSize() - prefixLength_;
+ }
+
+ // Truncate is necessary to trim the file to the correct size
+ // before closing. It is not always possible to keep track of the file
+ // size due to whole pages writes. The behavior is undefined if called
+ // with other writes to follow.
+ Status Truncate(uint64_t size) override {
+ return file_->Truncate(size + prefixLength_);
+ }
+
+ // Remove any kind of caching of data from the offset to offset+length
+ // of this file. If the length is 0, then it refers to the end of file.
+ // If the system is not caching the file contents, then this is a noop.
+ // This call has no effect on dirty pages in the cache.
+ Status InvalidateCache(size_t offset, size_t length) override {
+ return file_->InvalidateCache(offset + prefixLength_, length);
+ }
+
+ // Sync a file range with disk.
+ // offset is the starting byte of the file range to be synchronized.
+ // nbytes specifies the length of the range to be synchronized.
+ // This asks the OS to initiate flushing the cached data to disk,
+ // without waiting for completion.
+ // Default implementation does nothing.
+ Status RangeSync(uint64_t offset, uint64_t nbytes) override {
+ return file_->RangeSync(offset + prefixLength_, nbytes);
+ }
+
+ // PrepareWrite performs any necessary preparation for a write
+ // before the write actually occurs. This allows for pre-allocation
+ // of space on devices where it can result in less file
+ // fragmentation and/or less waste from over-zealous filesystem
+ // pre-allocation.
+ void PrepareWrite(size_t offset, size_t len) override {
+ file_->PrepareWrite(offset + prefixLength_, len);
+ }
+
+ // Pre-allocates space for a file.
+ Status Allocate(uint64_t offset, uint64_t len) override {
+ return file_->Allocate(offset + prefixLength_, len);
+ }
+};
+
+// A file abstraction for random reading and writing.
+class EncryptedRandomRWFile : public RandomRWFile {
+ private:
+ std::unique_ptr<RandomRWFile> file_;
+ std::unique_ptr<BlockAccessCipherStream> stream_;
+ size_t prefixLength_;
+
+ public:
+ EncryptedRandomRWFile(RandomRWFile* f, BlockAccessCipherStream* s, size_t prefixLength)
+ : file_(f), stream_(s), prefixLength_(prefixLength) {}
+
+ // Indicates if the class makes use of direct I/O
+ // If false you must pass aligned buffer to Write()
+ bool use_direct_io() const override { return file_->use_direct_io(); }
+
+ // Use the returned alignment value to allocate
+ // aligned buffer for Direct I/O
+ size_t GetRequiredBufferAlignment() const override {
+ return file_->GetRequiredBufferAlignment();
+ }
+
+ // Write bytes in `data` at offset `offset`, Returns Status::OK() on success.
+ // Pass aligned buffer when use_direct_io() returns true.
+ Status Write(uint64_t offset, const Slice& data) override {
+ AlignedBuffer buf;
+ Status status;
+ Slice dataToWrite(data);
+ offset += prefixLength_;
+ if (data.size() > 0) {
+ // Encrypt in cloned buffer
+ buf.Alignment(GetRequiredBufferAlignment());
+ buf.AllocateNewBuffer(data.size());
+ memmove(buf.BufferStart(), data.data(), data.size());
+ status = stream_->Encrypt(offset, buf.BufferStart(), data.size());
+ if (!status.ok()) {
+ return status;
+ }
+ dataToWrite = Slice(buf.BufferStart(), data.size());
+ }
+ status = file_->Write(offset, dataToWrite);
+ return status;
+ }
+
+ // Read up to `n` bytes starting from offset `offset` and store them in
+ // result, provided `scratch` size should be at least `n`.
+ // Returns Status::OK() on success.
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ assert(scratch);
+ offset += prefixLength_;
+ auto status = file_->Read(offset, n, result, scratch);
+ if (!status.ok()) {
+ return status;
+ }
+ status = stream_->Decrypt(offset, (char*)result->data(), result->size());
+ return status;
+ }
+
+ Status Flush() override { return file_->Flush(); }
+
+ Status Sync() override { return file_->Sync(); }
+
+ Status Fsync() override { return file_->Fsync(); }
+
+ Status Close() override { return file_->Close(); }
+};
+
+// EncryptedEnv implements an Env wrapper that adds encryption to files stored on disk.
+class EncryptedEnv : public EnvWrapper {
+ public:
+ EncryptedEnv(Env* base_env, EncryptionProvider *provider)
+ : EnvWrapper(base_env) {
+ provider_ = provider;
+ }
+
+ // NewSequentialFile opens a file for sequential reading.
+ Status NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ if (options.use_mmap_reads) {
+ return Status::InvalidArgument();
+ }
+ // Open file using underlying Env implementation
+ std::unique_ptr<SequentialFile> underlying;
+ auto status = EnvWrapper::NewSequentialFile(fname, &underlying, options);
+ if (!status.ok()) {
+ return status;
+ }
+ // Read prefix (if needed)
+ AlignedBuffer prefixBuf;
+ Slice prefixSlice;
+ size_t prefixLength = provider_->GetPrefixLength();
+ if (prefixLength > 0) {
+ // Read prefix
+ prefixBuf.Alignment(underlying->GetRequiredBufferAlignment());
+ prefixBuf.AllocateNewBuffer(prefixLength);
+ status = underlying->Read(prefixLength, &prefixSlice, prefixBuf.BufferStart());
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ // Create cipher stream
+ std::unique_ptr<BlockAccessCipherStream> stream;
+ status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream);
+ if (!status.ok()) {
+ return status;
+ }
+ (*result) = std::unique_ptr<SequentialFile>(new EncryptedSequentialFile(underlying.release(), stream.release(), prefixLength));
+ return Status::OK();
+ }
+
+ // NewRandomAccessFile opens a file for random read access.
+ Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ if (options.use_mmap_reads) {
+ return Status::InvalidArgument();
+ }
+ // Open file using underlying Env implementation
+ std::unique_ptr<RandomAccessFile> underlying;
+ auto status = EnvWrapper::NewRandomAccessFile(fname, &underlying, options);
+ if (!status.ok()) {
+ return status;
+ }
+ // Read prefix (if needed)
+ AlignedBuffer prefixBuf;
+ Slice prefixSlice;
+ size_t prefixLength = provider_->GetPrefixLength();
+ if (prefixLength > 0) {
+ // Read prefix
+ prefixBuf.Alignment(underlying->GetRequiredBufferAlignment());
+ prefixBuf.AllocateNewBuffer(prefixLength);
+ status = underlying->Read(0, prefixLength, &prefixSlice, prefixBuf.BufferStart());
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ // Create cipher stream
+ std::unique_ptr<BlockAccessCipherStream> stream;
+ status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream);
+ if (!status.ok()) {
+ return status;
+ }
+ (*result) = std::unique_ptr<RandomAccessFile>(new EncryptedRandomAccessFile(underlying.release(), stream.release(), prefixLength));
+ return Status::OK();
+ }
+
+ // NewWritableFile opens a file for sequential writing.
+ Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ if (options.use_mmap_writes) {
+ return Status::InvalidArgument();
+ }
+ // Open file using underlying Env implementation
+ std::unique_ptr<WritableFile> underlying;
+ Status status = EnvWrapper::NewWritableFile(fname, &underlying, options);
+ if (!status.ok()) {
+ return status;
+ }
+ // Initialize & write prefix (if needed)
+ AlignedBuffer prefixBuf;
+ Slice prefixSlice;
+ size_t prefixLength = provider_->GetPrefixLength();
+ if (prefixLength > 0) {
+ // Initialize prefix
+ prefixBuf.Alignment(underlying->GetRequiredBufferAlignment());
+ prefixBuf.AllocateNewBuffer(prefixLength);
+ provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength);
+ prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength);
+ // Write prefix
+ status = underlying->Append(prefixSlice);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ // Create cipher stream
+ std::unique_ptr<BlockAccessCipherStream> stream;
+ status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream);
+ if (!status.ok()) {
+ return status;
+ }
+ (*result) = std::unique_ptr<WritableFile>(new EncryptedWritableFile(underlying.release(), stream.release(), prefixLength));
+ return Status::OK();
+ }
+
+ // Create an object that writes to a new file with the specified
+ // name. Deletes any existing file with the same name and creates a
+ // new file. On success, stores a pointer to the new file in
+ // *result and returns OK. On failure stores nullptr in *result and
+ // returns non-OK.
+ //
+ // The returned file will only be accessed by one thread at a time.
+ Status ReopenWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ if (options.use_mmap_writes) {
+ return Status::InvalidArgument();
+ }
+ // Open file using underlying Env implementation
+ std::unique_ptr<WritableFile> underlying;
+ Status status = EnvWrapper::ReopenWritableFile(fname, &underlying, options);
+ if (!status.ok()) {
+ return status;
+ }
+ // Initialize & write prefix (if needed)
+ AlignedBuffer prefixBuf;
+ Slice prefixSlice;
+ size_t prefixLength = provider_->GetPrefixLength();
+ if (prefixLength > 0) {
+ // Initialize prefix
+ prefixBuf.Alignment(underlying->GetRequiredBufferAlignment());
+ prefixBuf.AllocateNewBuffer(prefixLength);
+ provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength);
+ prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength);
+ // Write prefix
+ status = underlying->Append(prefixSlice);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ // Create cipher stream
+ std::unique_ptr<BlockAccessCipherStream> stream;
+ status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream);
+ if (!status.ok()) {
+ return status;
+ }
+ (*result) = std::unique_ptr<WritableFile>(new EncryptedWritableFile(underlying.release(), stream.release(), prefixLength));
+ return Status::OK();
+ }
+
+ // Reuse an existing file by renaming it and opening it as writable.
+ Status ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ if (options.use_mmap_writes) {
+ return Status::InvalidArgument();
+ }
+ // Open file using underlying Env implementation
+ std::unique_ptr<WritableFile> underlying;
+ Status status = EnvWrapper::ReuseWritableFile(fname, old_fname, &underlying, options);
+ if (!status.ok()) {
+ return status;
+ }
+ // Initialize & write prefix (if needed)
+ AlignedBuffer prefixBuf;
+ Slice prefixSlice;
+ size_t prefixLength = provider_->GetPrefixLength();
+ if (prefixLength > 0) {
+ // Initialize prefix
+ prefixBuf.Alignment(underlying->GetRequiredBufferAlignment());
+ prefixBuf.AllocateNewBuffer(prefixLength);
+ provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength);
+ prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength);
+ // Write prefix
+ status = underlying->Append(prefixSlice);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ // Create cipher stream
+ std::unique_ptr<BlockAccessCipherStream> stream;
+ status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream);
+ if (!status.ok()) {
+ return status;
+ }
+ (*result) = std::unique_ptr<WritableFile>(new EncryptedWritableFile(underlying.release(), stream.release(), prefixLength));
+ return Status::OK();
+ }
+
+ // Open `fname` for random read and write, if file doesn't exist the file
+ // will be created. On success, stores a pointer to the new file in
+ // *result and returns OK. On failure returns non-OK.
+ //
+ // The returned file will only be accessed by one thread at a time.
+ Status NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ if (options.use_mmap_reads || options.use_mmap_writes) {
+ return Status::InvalidArgument();
+ }
+ // Check file exists
+ bool isNewFile = !FileExists(fname).ok();
+
+ // Open file using underlying Env implementation
+ std::unique_ptr<RandomRWFile> underlying;
+ Status status = EnvWrapper::NewRandomRWFile(fname, &underlying, options);
+ if (!status.ok()) {
+ return status;
+ }
+ // Read or Initialize & write prefix (if needed)
+ AlignedBuffer prefixBuf;
+ Slice prefixSlice;
+ size_t prefixLength = provider_->GetPrefixLength();
+ if (prefixLength > 0) {
+ prefixBuf.Alignment(underlying->GetRequiredBufferAlignment());
+ prefixBuf.AllocateNewBuffer(prefixLength);
+ if (!isNewFile) {
+ // File already exists, read prefix
+ status = underlying->Read(0, prefixLength, &prefixSlice, prefixBuf.BufferStart());
+ if (!status.ok()) {
+ return status;
+ }
+ } else {
+ // File is new, initialize & write prefix
+ provider_->CreateNewPrefix(fname, prefixBuf.BufferStart(), prefixLength);
+ prefixSlice = Slice(prefixBuf.BufferStart(), prefixLength);
+ // Write prefix
+ status = underlying->Write(0, prefixSlice);
+ if (!status.ok()) {
+ return status;
+ }
+ }
+ }
+ // Create cipher stream
+ std::unique_ptr<BlockAccessCipherStream> stream;
+ status = provider_->CreateCipherStream(fname, options, prefixSlice, &stream);
+ if (!status.ok()) {
+ return status;
+ }
+ (*result) = std::unique_ptr<RandomRWFile>(new EncryptedRandomRWFile(underlying.release(), stream.release(), prefixLength));
+ return Status::OK();
+ }
+
+ // Store in *result the attributes of the children of the specified directory.
+ // In case the implementation lists the directory prior to iterating the files
+ // and files are concurrently deleted, the deleted files will be omitted from
+ // result.
+ // The name attributes are relative to "dir".
+ // Original contents of *results are dropped.
+ // Returns OK if "dir" exists and "*result" contains its children.
+ // NotFound if "dir" does not exist, the calling process does not have
+ // permission to access "dir", or if "dir" is invalid.
+ // IOError if an IO Error was encountered
+ Status GetChildrenFileAttributes(
+ const std::string& dir, std::vector<FileAttributes>* result) override {
+ auto status = EnvWrapper::GetChildrenFileAttributes(dir, result);
+ if (!status.ok()) {
+ return status;
+ }
+ size_t prefixLength = provider_->GetPrefixLength();
+ for (auto it = std::begin(*result); it!=std::end(*result); ++it) {
+ assert(it->size_bytes >= prefixLength);
+ it->size_bytes -= prefixLength;
+ }
+ return Status::OK();
+ }
+
+ // Store the size of fname in *file_size.
+ Status GetFileSize(const std::string& fname, uint64_t* file_size) override {
+ auto status = EnvWrapper::GetFileSize(fname, file_size);
+ if (!status.ok()) {
+ return status;
+ }
+ size_t prefixLength = provider_->GetPrefixLength();
+ assert(*file_size >= prefixLength);
+ *file_size -= prefixLength;
+ return Status::OK();
+ }
+
+ private:
+ EncryptionProvider *provider_;
+};
+
+
+// Returns an Env that encrypts data when stored on disk and decrypts data when
+// read from disk.
+Env* NewEncryptedEnv(Env* base_env, EncryptionProvider* provider) {
+ return new EncryptedEnv(base_env, provider);
+}
+
+// Encrypt one or more (partial) blocks of data at the file offset.
+// Length of data is given in dataSize.
+Status BlockAccessCipherStream::Encrypt(uint64_t fileOffset, char *data, size_t dataSize) {
+ // Calculate block index
+ auto blockSize = BlockSize();
+ uint64_t blockIndex = fileOffset / blockSize;
+ size_t blockOffset = fileOffset % blockSize;
+ std::unique_ptr<char[]> blockBuffer;
+
+ std::string scratch;
+ AllocateScratch(scratch);
+
+ // Encrypt individual blocks.
+ while (1) {
+ char *block = data;
+ size_t n = std::min(dataSize, blockSize - blockOffset);
+ if (n != blockSize) {
+ // We're not encrypting a full block.
+ // Copy data to blockBuffer
+ if (!blockBuffer.get()) {
+ // Allocate buffer
+ blockBuffer = std::unique_ptr<char[]>(new char[blockSize]);
+ }
+ block = blockBuffer.get();
+ // Copy plain data to block buffer
+ memmove(block + blockOffset, data, n);
+ }
+ auto status = EncryptBlock(blockIndex, block, (char*)scratch.data());
+ if (!status.ok()) {
+ return status;
+ }
+ if (block != data) {
+ // Copy encrypted data back to `data`.
+ memmove(data, block + blockOffset, n);
+ }
+ dataSize -= n;
+ if (dataSize == 0) {
+ return Status::OK();
+ }
+ data += n;
+ blockOffset = 0;
+ blockIndex++;
+ }
+}
+
+// Decrypt one or more (partial) blocks of data at the file offset.
+// Length of data is given in dataSize.
+Status BlockAccessCipherStream::Decrypt(uint64_t fileOffset, char *data, size_t dataSize) {
+ // Calculate block index
+ auto blockSize = BlockSize();
+ uint64_t blockIndex = fileOffset / blockSize;
+ size_t blockOffset = fileOffset % blockSize;
+ std::unique_ptr<char[]> blockBuffer;
+
+ std::string scratch;
+ AllocateScratch(scratch);
+
+ assert(fileOffset < dataSize);
+
+ // Decrypt individual blocks.
+ while (1) {
+ char *block = data;
+ size_t n = std::min(dataSize, blockSize - blockOffset);
+ if (n != blockSize) {
+ // We're not decrypting a full block.
+ // Copy data to blockBuffer
+ if (!blockBuffer.get()) {
+ // Allocate buffer
+ blockBuffer = std::unique_ptr<char[]>(new char[blockSize]);
+ }
+ block = blockBuffer.get();
+ // Copy encrypted data to block buffer
+ memmove(block + blockOffset, data, n);
+ }
+ auto status = DecryptBlock(blockIndex, block, (char*)scratch.data());
+ if (!status.ok()) {
+ return status;
+ }
+ if (block != data) {
+ // Copy decrypted data back to `data`.
+ memmove(data, block + blockOffset, n);
+ }
+
+ // Simply decrementing dataSize by n could cause it to underflow,
+ // which will very likely make it read over the original bounds later
+ assert(dataSize >= n);
+ if (dataSize < n) {
+ return Status::Corruption("Cannot decrypt data at given offset");
+ }
+
+ dataSize -= n;
+ if (dataSize == 0) {
+ return Status::OK();
+ }
+ data += n;
+ blockOffset = 0;
+ blockIndex++;
+ }
+}
+
+// Encrypt a block of data.
+// Length of data is equal to BlockSize().
+Status ROT13BlockCipher::Encrypt(char *data) {
+ for (size_t i = 0; i < blockSize_; ++i) {
+ data[i] += 13;
+ }
+ return Status::OK();
+}
+
+// Decrypt a block of data.
+// Length of data is equal to BlockSize().
+Status ROT13BlockCipher::Decrypt(char *data) {
+ return Encrypt(data);
+}
+
+// Allocate scratch space which is passed to EncryptBlock/DecryptBlock.
+void CTRCipherStream::AllocateScratch(std::string& scratch) {
+ auto blockSize = cipher_.BlockSize();
+ scratch.reserve(blockSize);
+}
+
+// Encrypt a block of data at the given block index.
+// Length of data is equal to BlockSize();
+Status CTRCipherStream::EncryptBlock(uint64_t blockIndex, char *data, char* scratch) {
+
+ // Create nonce + counter
+ auto blockSize = cipher_.BlockSize();
+ memmove(scratch, iv_.data(), blockSize);
+ EncodeFixed64(scratch, blockIndex + initialCounter_);
+
+ // Encrypt nonce+counter
+ auto status = cipher_.Encrypt(scratch);
+ if (!status.ok()) {
+ return status;
+ }
+
+ // XOR data with ciphertext.
+ for (size_t i = 0; i < blockSize; i++) {
+ data[i] = data[i] ^ scratch[i];
+ }
+ return Status::OK();
+}
+
+// Decrypt a block of data at the given block index.
+// Length of data is equal to BlockSize();
+Status CTRCipherStream::DecryptBlock(uint64_t blockIndex, char *data, char* scratch) {
+ // For CTR decryption & encryption are the same
+ return EncryptBlock(blockIndex, data, scratch);
+}
+
+// GetPrefixLength returns the length of the prefix that is added to every file
+// and used for storing encryption options.
+// For optimal performance, the prefix length should be a multiple of
+// the page size.
+size_t CTREncryptionProvider::GetPrefixLength() {
+ return defaultPrefixLength;
+}
+
+// decodeCTRParameters decodes the initial counter & IV from the given
+// (plain text) prefix.
+static void decodeCTRParameters(const char *prefix, size_t blockSize, uint64_t &initialCounter, Slice &iv) {
+ // First block contains 64-bit initial counter
+ initialCounter = DecodeFixed64(prefix);
+ // Second block contains IV
+ iv = Slice(prefix + blockSize, blockSize);
+}
+
+// CreateNewPrefix initialized an allocated block of prefix memory
+// for a new file.
+Status CTREncryptionProvider::CreateNewPrefix(const std::string& /*fname*/,
+ char* prefix,
+ size_t prefixLength) {
+ // Create & seed rnd.
+ Random rnd((uint32_t)Env::Default()->NowMicros());
+ // Fill entire prefix block with random values.
+ for (size_t i = 0; i < prefixLength; i++) {
+ prefix[i] = rnd.Uniform(256) & 0xFF;
+ }
+ // Take random data to extract initial counter & IV
+ auto blockSize = cipher_.BlockSize();
+ uint64_t initialCounter;
+ Slice prefixIV;
+ decodeCTRParameters(prefix, blockSize, initialCounter, prefixIV);
+
+ // Now populate the rest of the prefix, starting from the third block.
+ PopulateSecretPrefixPart(prefix + (2 * blockSize), prefixLength - (2 * blockSize), blockSize);
+
+ // Encrypt the prefix, starting from block 2 (leave block 0, 1 with initial counter & IV unencrypted)
+ CTRCipherStream cipherStream(cipher_, prefixIV.data(), initialCounter);
+ auto status = cipherStream.Encrypt(0, prefix + (2 * blockSize), prefixLength - (2 * blockSize));
+ if (!status.ok()) {
+ return status;
+ }
+ return Status::OK();
+}
+
+// PopulateSecretPrefixPart initializes the data into a new prefix block
+// in plain text.
+// Returns the amount of space (starting from the start of the prefix)
+// that has been initialized.
+size_t CTREncryptionProvider::PopulateSecretPrefixPart(char* /*prefix*/,
+ size_t /*prefixLength*/,
+ size_t /*blockSize*/) {
+ // Nothing to do here, put in custom data in override when needed.
+ return 0;
+}
+
+Status CTREncryptionProvider::CreateCipherStream(
+ const std::string& fname, const EnvOptions& options, Slice& prefix,
+ std::unique_ptr<BlockAccessCipherStream>* result) {
+ // Read plain text part of prefix.
+ auto blockSize = cipher_.BlockSize();
+ uint64_t initialCounter;
+ Slice iv;
+ decodeCTRParameters(prefix.data(), blockSize, initialCounter, iv);
+
+ // If the prefix is smaller than twice the block size, we would below read a
+ // very large chunk of the file (and very likely read over the bounds)
+ assert(prefix.size() >= 2 * blockSize);
+ if (prefix.size() < 2 * blockSize) {
+ return Status::Corruption("Unable to read from file " + fname + ": read attempt would read beyond file bounds");
+ }
+
+ // Decrypt the encrypted part of the prefix, starting from block 2 (block 0, 1 with initial counter & IV are unencrypted)
+ CTRCipherStream cipherStream(cipher_, iv.data(), initialCounter);
+ auto status = cipherStream.Decrypt(0, (char*)prefix.data() + (2 * blockSize), prefix.size() - (2 * blockSize));
+ if (!status.ok()) {
+ return status;
+ }
+
+ // Create cipher stream
+ return CreateCipherStreamFromPrefix(fname, options, initialCounter, iv, prefix, result);
+}
+
+// CreateCipherStreamFromPrefix creates a block access cipher stream for a file given
+// given name and options. The given prefix is already decrypted.
+Status CTREncryptionProvider::CreateCipherStreamFromPrefix(
+ const std::string& /*fname*/, const EnvOptions& /*options*/,
+ uint64_t initialCounter, const Slice& iv, const Slice& /*prefix*/,
+ std::unique_ptr<BlockAccessCipherStream>* result) {
+ (*result) = std::unique_ptr<BlockAccessCipherStream>(
+ new CTRCipherStream(cipher_, iv.data(), initialCounter));
+ return Status::OK();
+}
+
+#endif // ROCKSDB_LITE
+
+} // namespace rocksdb
diff --git a/src/rocksdb/env/env_hdfs.cc b/src/rocksdb/env/env_hdfs.cc
new file mode 100644
index 00000000..5acf9301
--- /dev/null
+++ b/src/rocksdb/env/env_hdfs.cc
@@ -0,0 +1,627 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+
+#include "rocksdb/env.h"
+#include "hdfs/env_hdfs.h"
+
+#ifdef USE_HDFS
+#ifndef ROCKSDB_HDFS_FILE_C
+#define ROCKSDB_HDFS_FILE_C
+
+#include <stdio.h>
+#include <sys/time.h>
+#include <time.h>
+#include <algorithm>
+#include <iostream>
+#include <sstream>
+#include "rocksdb/status.h"
+#include "util/logging.h"
+#include "util/string_util.h"
+
+#define HDFS_EXISTS 0
+#define HDFS_DOESNT_EXIST -1
+#define HDFS_SUCCESS 0
+
+//
+// This file defines an HDFS environment for rocksdb. It uses the libhdfs
+// api to access HDFS. All HDFS files created by one instance of rocksdb
+// will reside on the same HDFS cluster.
+//
+
+namespace rocksdb {
+
+namespace {
+
+// Log error message
+static Status IOError(const std::string& context, int err_number) {
+ return (err_number == ENOSPC)
+ ? Status::NoSpace(context, strerror(err_number))
+ : (err_number == ENOENT)
+ ? Status::PathNotFound(context, strerror(err_number))
+ : Status::IOError(context, strerror(err_number));
+}
+
+// assume that there is one global logger for now. It is not thread-safe,
+// but need not be because the logger is initialized at db-open time.
+static Logger* mylog = nullptr;
+
+// Used for reading a file from HDFS. It implements both sequential-read
+// access methods as well as random read access methods.
+class HdfsReadableFile : virtual public SequentialFile,
+ virtual public RandomAccessFile {
+ private:
+ hdfsFS fileSys_;
+ std::string filename_;
+ hdfsFile hfile_;
+
+ public:
+ HdfsReadableFile(hdfsFS fileSys, const std::string& fname)
+ : fileSys_(fileSys), filename_(fname), hfile_(nullptr) {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n",
+ filename_.c_str());
+ hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0);
+ ROCKS_LOG_DEBUG(mylog,
+ "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n",
+ filename_.c_str(), hfile_);
+ }
+
+ virtual ~HdfsReadableFile() {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n",
+ filename_.c_str());
+ hdfsCloseFile(fileSys_, hfile_);
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n",
+ filename_.c_str());
+ hfile_ = nullptr;
+ }
+
+ bool isValid() {
+ return hfile_ != nullptr;
+ }
+
+ // sequential access, read data at current offset in file
+ virtual Status Read(size_t n, Slice* result, char* scratch) {
+ Status s;
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n",
+ filename_.c_str(), n);
+
+ char* buffer = scratch;
+ size_t total_bytes_read = 0;
+ tSize bytes_read = 0;
+ tSize remaining_bytes = (tSize)n;
+
+ // Read a total of n bytes repeatedly until we hit error or eof
+ while (remaining_bytes > 0) {
+ bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes);
+ if (bytes_read <= 0) {
+ break;
+ }
+ assert(bytes_read <= remaining_bytes);
+
+ total_bytes_read += bytes_read;
+ remaining_bytes -= bytes_read;
+ buffer += bytes_read;
+ }
+ assert(total_bytes_read <= n);
+
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n",
+ filename_.c_str());
+
+ if (bytes_read < 0) {
+ s = IOError(filename_, errno);
+ } else {
+ *result = Slice(scratch, total_bytes_read);
+ }
+
+ return s;
+ }
+
+ // random access, read data from specified offset in file
+ virtual Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const {
+ Status s;
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n",
+ filename_.c_str());
+ ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset,
+ (void*)scratch, (tSize)n);
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n",
+ filename_.c_str());
+ *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read);
+ if (bytes_read < 0) {
+ // An error: return a non-ok status
+ s = IOError(filename_, errno);
+ }
+ return s;
+ }
+
+ virtual Status Skip(uint64_t n) {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n",
+ filename_.c_str());
+ // get current offset from file
+ tOffset current = hdfsTell(fileSys_, hfile_);
+ if (current < 0) {
+ return IOError(filename_, errno);
+ }
+ // seek to new offset in file
+ tOffset newoffset = current + n;
+ int val = hdfsSeek(fileSys_, hfile_, newoffset);
+ if (val < 0) {
+ return IOError(filename_, errno);
+ }
+ return Status::OK();
+ }
+
+ private:
+
+ // returns true if we are at the end of file, false otherwise
+ bool feof() {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n",
+ filename_.c_str());
+ if (hdfsTell(fileSys_, hfile_) == fileSize()) {
+ return true;
+ }
+ return false;
+ }
+
+ // the current size of the file
+ tOffset fileSize() {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n",
+ filename_.c_str());
+ hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str());
+ tOffset size = 0L;
+ if (pFileInfo != nullptr) {
+ size = pFileInfo->mSize;
+ hdfsFreeFileInfo(pFileInfo, 1);
+ } else {
+ throw HdfsFatalException("fileSize on unknown file " + filename_);
+ }
+ return size;
+ }
+};
+
+// Appends to an existing file in HDFS.
+class HdfsWritableFile: public WritableFile {
+ private:
+ hdfsFS fileSys_;
+ std::string filename_;
+ hdfsFile hfile_;
+
+ public:
+ HdfsWritableFile(hdfsFS fileSys, const std::string& fname)
+ : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n",
+ filename_.c_str());
+ hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0);
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n",
+ filename_.c_str());
+ assert(hfile_ != nullptr);
+ }
+ virtual ~HdfsWritableFile() {
+ if (hfile_ != nullptr) {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
+ filename_.c_str());
+ hdfsCloseFile(fileSys_, hfile_);
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
+ filename_.c_str());
+ hfile_ = nullptr;
+ }
+ }
+
+ // If the file was successfully created, then this returns true.
+ // Otherwise returns false.
+ bool isValid() {
+ return hfile_ != nullptr;
+ }
+
+ // The name of the file, mostly needed for debug logging.
+ const std::string& getName() {
+ return filename_;
+ }
+
+ virtual Status Append(const Slice& data) {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n",
+ filename_.c_str());
+ const char* src = data.data();
+ size_t left = data.size();
+ size_t ret = hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(left));
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n",
+ filename_.c_str());
+ if (ret != left) {
+ return IOError(filename_, errno);
+ }
+ return Status::OK();
+ }
+
+ virtual Status Flush() {
+ return Status::OK();
+ }
+
+ virtual Status Sync() {
+ Status s;
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n",
+ filename_.c_str());
+ if (hdfsFlush(fileSys_, hfile_) == -1) {
+ return IOError(filename_, errno);
+ }
+ if (hdfsHSync(fileSys_, hfile_) == -1) {
+ return IOError(filename_, errno);
+ }
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n",
+ filename_.c_str());
+ return Status::OK();
+ }
+
+ // This is used by HdfsLogger to write data to the debug log file
+ virtual Status Append(const char* src, size_t size) {
+ if (hdfsWrite(fileSys_, hfile_, src, static_cast<tSize>(size)) !=
+ static_cast<tSize>(size)) {
+ return IOError(filename_, errno);
+ }
+ return Status::OK();
+ }
+
+ virtual Status Close() {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n",
+ filename_.c_str());
+ if (hdfsCloseFile(fileSys_, hfile_) != 0) {
+ return IOError(filename_, errno);
+ }
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n",
+ filename_.c_str());
+ hfile_ = nullptr;
+ return Status::OK();
+ }
+};
+
+// The object that implements the debug logs to reside in HDFS.
+class HdfsLogger : public Logger {
+ private:
+ HdfsWritableFile* file_;
+ uint64_t (*gettid_)(); // Return the thread id for the current thread
+
+ Status HdfsCloseHelper() {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n",
+ file_->getName().c_str());
+ if (mylog != nullptr && mylog == this) {
+ mylog = nullptr;
+ }
+ return Status::OK();
+ }
+
+ protected:
+ virtual Status CloseImpl() override { return HdfsCloseHelper(); }
+
+ public:
+ HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)())
+ : file_(f), gettid_(gettid) {
+ ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n",
+ file_->getName().c_str());
+ }
+
+ ~HdfsLogger() override {
+ if (!closed_) {
+ closed_ = true;
+ HdfsCloseHelper();
+ }
+ }
+
+ using Logger::Logv;
+ void Logv(const char* format, va_list ap) override {
+ const uint64_t thread_id = (*gettid_)();
+
+ // We try twice: the first time with a fixed-size stack allocated buffer,
+ // and the second time with a much larger dynamically allocated buffer.
+ char buffer[500];
+ for (int iter = 0; iter < 2; iter++) {
+ char* base;
+ int bufsize;
+ if (iter == 0) {
+ bufsize = sizeof(buffer);
+ base = buffer;
+ } else {
+ bufsize = 30000;
+ base = new char[bufsize];
+ }
+ char* p = base;
+ char* limit = base + bufsize;
+
+ struct timeval now_tv;
+ gettimeofday(&now_tv, nullptr);
+ const time_t seconds = now_tv.tv_sec;
+ struct tm t;
+ localtime_r(&seconds, &t);
+ p += snprintf(p, limit - p,
+ "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
+ t.tm_year + 1900,
+ t.tm_mon + 1,
+ t.tm_mday,
+ t.tm_hour,
+ t.tm_min,
+ t.tm_sec,
+ static_cast<int>(now_tv.tv_usec),
+ static_cast<long long unsigned int>(thread_id));
+
+ // Print the message
+ if (p < limit) {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ p += vsnprintf(p, limit - p, format, backup_ap);
+ va_end(backup_ap);
+ }
+
+ // Truncate to available space if necessary
+ if (p >= limit) {
+ if (iter == 0) {
+ continue; // Try again with larger buffer
+ } else {
+ p = limit - 1;
+ }
+ }
+
+ // Add newline if necessary
+ if (p == base || p[-1] != '\n') {
+ *p++ = '\n';
+ }
+
+ assert(p <= limit);
+ file_->Append(base, p-base);
+ file_->Flush();
+ if (base != buffer) {
+ delete[] base;
+ }
+ break;
+ }
+ }
+};
+
+} // namespace
+
+// Finally, the hdfs environment
+
+const std::string HdfsEnv::kProto = "hdfs://";
+const std::string HdfsEnv::pathsep = "/";
+
+// open a file for sequential reading
+Status HdfsEnv::NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& /*options*/) {
+ result->reset();
+ HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
+ if (f == nullptr || !f->isValid()) {
+ delete f;
+ *result = nullptr;
+ return IOError(fname, errno);
+ }
+ result->reset(dynamic_cast<SequentialFile*>(f));
+ return Status::OK();
+}
+
+// open a file for random reading
+Status HdfsEnv::NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& /*options*/) {
+ result->reset();
+ HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname);
+ if (f == nullptr || !f->isValid()) {
+ delete f;
+ *result = nullptr;
+ return IOError(fname, errno);
+ }
+ result->reset(dynamic_cast<RandomAccessFile*>(f));
+ return Status::OK();
+}
+
+// create a new file for writing
+Status HdfsEnv::NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& /*options*/) {
+ result->reset();
+ Status s;
+ HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
+ if (f == nullptr || !f->isValid()) {
+ delete f;
+ *result = nullptr;
+ return IOError(fname, errno);
+ }
+ result->reset(dynamic_cast<WritableFile*>(f));
+ return Status::OK();
+}
+
+class HdfsDirectory : public Directory {
+ public:
+ explicit HdfsDirectory(int fd) : fd_(fd) {}
+ ~HdfsDirectory() {}
+
+ Status Fsync() override { return Status::OK(); }
+
+ int GetFd() const { return fd_; }
+
+ private:
+ int fd_;
+};
+
+Status HdfsEnv::NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) {
+ int value = hdfsExists(fileSys_, name.c_str());
+ switch (value) {
+ case HDFS_EXISTS:
+ result->reset(new HdfsDirectory(0));
+ return Status::OK();
+ default: // fail if the directory doesn't exist
+ ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed");
+ throw HdfsFatalException("hdfsExists call failed with error " +
+ ToString(value) + " on path " + name +
+ ".\n");
+ }
+}
+
+Status HdfsEnv::FileExists(const std::string& fname) {
+ int value = hdfsExists(fileSys_, fname.c_str());
+ switch (value) {
+ case HDFS_EXISTS:
+ return Status::OK();
+ case HDFS_DOESNT_EXIST:
+ return Status::NotFound();
+ default: // anything else should be an error
+ ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed");
+ return Status::IOError("hdfsExists call failed with error " +
+ ToString(value) + " on path " + fname + ".\n");
+ }
+}
+
+Status HdfsEnv::GetChildren(const std::string& path,
+ std::vector<std::string>* result) {
+ int value = hdfsExists(fileSys_, path.c_str());
+ switch (value) {
+ case HDFS_EXISTS: { // directory exists
+ int numEntries = 0;
+ hdfsFileInfo* pHdfsFileInfo = 0;
+ pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries);
+ if (numEntries >= 0) {
+ for(int i = 0; i < numEntries; i++) {
+ std::string pathname(pHdfsFileInfo[i].mName);
+ size_t pos = pathname.rfind("/");
+ if (std::string::npos != pos) {
+ result->push_back(pathname.substr(pos + 1));
+ }
+ }
+ if (pHdfsFileInfo != nullptr) {
+ hdfsFreeFileInfo(pHdfsFileInfo, numEntries);
+ }
+ } else {
+ // numEntries < 0 indicates error
+ ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error ");
+ throw HdfsFatalException(
+ "hdfsListDirectory call failed negative error.\n");
+ }
+ break;
+ }
+ case HDFS_DOESNT_EXIST: // directory does not exist, exit
+ return Status::NotFound();
+ default: // anything else should be an error
+ ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed");
+ throw HdfsFatalException("hdfsExists call failed with error " +
+ ToString(value) + ".\n");
+ }
+ return Status::OK();
+}
+
+Status HdfsEnv::DeleteFile(const std::string& fname) {
+ if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) {
+ return Status::OK();
+ }
+ return IOError(fname, errno);
+};
+
+Status HdfsEnv::CreateDir(const std::string& name) {
+ if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) {
+ return Status::OK();
+ }
+ return IOError(name, errno);
+};
+
+Status HdfsEnv::CreateDirIfMissing(const std::string& name) {
+ const int value = hdfsExists(fileSys_, name.c_str());
+ // Not atomic. state might change b/w hdfsExists and CreateDir.
+ switch (value) {
+ case HDFS_EXISTS:
+ return Status::OK();
+ case HDFS_DOESNT_EXIST:
+ return CreateDir(name);
+ default: // anything else should be an error
+ ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed");
+ throw HdfsFatalException("hdfsExists call failed with error " +
+ ToString(value) + ".\n");
+ }
+};
+
+Status HdfsEnv::DeleteDir(const std::string& name) {
+ return DeleteFile(name);
+};
+
+Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) {
+ *size = 0L;
+ hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
+ if (pFileInfo != nullptr) {
+ *size = pFileInfo->mSize;
+ hdfsFreeFileInfo(pFileInfo, 1);
+ return Status::OK();
+ }
+ return IOError(fname, errno);
+}
+
+Status HdfsEnv::GetFileModificationTime(const std::string& fname,
+ uint64_t* time) {
+ hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str());
+ if (pFileInfo != nullptr) {
+ *time = static_cast<uint64_t>(pFileInfo->mLastMod);
+ hdfsFreeFileInfo(pFileInfo, 1);
+ return Status::OK();
+ }
+ return IOError(fname, errno);
+
+}
+
+// The rename is not atomic. HDFS does not allow a renaming if the
+// target already exists. So, we delete the target before attempting the
+// rename.
+Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) {
+ hdfsDelete(fileSys_, target.c_str(), 1);
+ if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) {
+ return Status::OK();
+ }
+ return IOError(src, errno);
+}
+
+Status HdfsEnv::LockFile(const std::string& /*fname*/, FileLock** lock) {
+ // there isn's a very good way to atomically check and create
+ // a file via libhdfs
+ *lock = nullptr;
+ return Status::OK();
+}
+
+Status HdfsEnv::UnlockFile(FileLock* /*lock*/) { return Status::OK(); }
+
+Status HdfsEnv::NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) {
+ HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname);
+ if (f == nullptr || !f->isValid()) {
+ delete f;
+ *result = nullptr;
+ return IOError(fname, errno);
+ }
+ HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid);
+ result->reset(h);
+ if (mylog == nullptr) {
+ // mylog = h; // uncomment this for detailed logging
+ }
+ return Status::OK();
+}
+
+// The factory method for creating an HDFS Env
+Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) {
+ *hdfs_env = new HdfsEnv(fsname);
+ return Status::OK();
+}
+} // namespace rocksdb
+
+#endif // ROCKSDB_HDFS_FILE_C
+
+#else // USE_HDFS
+
+// dummy placeholders used when HDFS is not available
+namespace rocksdb {
+Status HdfsEnv::NewSequentialFile(const std::string& /*fname*/,
+ std::unique_ptr<SequentialFile>* /*result*/,
+ const EnvOptions& /*options*/) {
+ return Status::NotSupported("Not compiled with hdfs support");
+}
+
+ Status NewHdfsEnv(Env** /*hdfs_env*/, const std::string& /*fsname*/) {
+ return Status::NotSupported("Not compiled with hdfs support");
+ }
+}
+
+#endif
diff --git a/src/rocksdb/env/env_posix.cc b/src/rocksdb/env/env_posix.cc
new file mode 100644
index 00000000..387c0279
--- /dev/null
+++ b/src/rocksdb/env/env_posix.cc
@@ -0,0 +1,1128 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#if defined(OS_LINUX)
+#include <linux/fs.h>
+#endif
+#include <pthread.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
+#include <sys/statfs.h>
+#include <sys/syscall.h>
+#include <sys/sysmacros.h>
+#endif
+#include <sys/statvfs.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <time.h>
+#include <algorithm>
+// Get nano time includes
+#if defined(OS_LINUX) || defined(OS_FREEBSD)
+#elif defined(__MACH__)
+#include <mach/clock.h>
+#include <mach/mach.h>
+#else
+#include <chrono>
+#endif
+#include <deque>
+#include <set>
+#include <vector>
+
+#include "env/io_posix.h"
+#include "env/posix_logger.h"
+#include "monitoring/iostats_context_imp.h"
+#include "monitoring/thread_status_updater.h"
+#include "port/port.h"
+#include "rocksdb/options.h"
+#include "rocksdb/slice.h"
+#include "util/coding.h"
+#include "util/compression_context_cache.h"
+#include "util/logging.h"
+#include "util/random.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+#include "util/thread_local.h"
+#include "util/threadpool_imp.h"
+
+#if !defined(TMPFS_MAGIC)
+#define TMPFS_MAGIC 0x01021994
+#endif
+#if !defined(XFS_SUPER_MAGIC)
+#define XFS_SUPER_MAGIC 0x58465342
+#endif
+#if !defined(EXT4_SUPER_MAGIC)
+#define EXT4_SUPER_MAGIC 0xEF53
+#endif
+
+namespace rocksdb {
+
+namespace {
+
+ThreadStatusUpdater* CreateThreadStatusUpdater() {
+ return new ThreadStatusUpdater();
+}
+
+inline mode_t GetDBFileMode(bool allow_non_owner_access) {
+ return allow_non_owner_access ? 0644 : 0600;
+}
+
+// list of pathnames that are locked
+static std::set<std::string> lockedFiles;
+static port::Mutex mutex_lockedFiles;
+
+static int LockOrUnlock(int fd, bool lock) {
+ errno = 0;
+ struct flock f;
+ memset(&f, 0, sizeof(f));
+ f.l_type = (lock ? F_WRLCK : F_UNLCK);
+ f.l_whence = SEEK_SET;
+ f.l_start = 0;
+ f.l_len = 0; // Lock/unlock entire file
+ int value = fcntl(fd, F_SETLK, &f);
+
+ return value;
+}
+
+class PosixFileLock : public FileLock {
+ public:
+ int fd_;
+ std::string filename;
+};
+
+int cloexec_flags(int flags, const EnvOptions* options) {
+ // If the system supports opening the file with cloexec enabled,
+ // do so, as this avoids a race condition if a db is opened around
+ // the same time that a child process is forked
+#ifdef O_CLOEXEC
+ if (options == nullptr || options->set_fd_cloexec) {
+ flags |= O_CLOEXEC;
+ }
+#endif
+ return flags;
+}
+
+class PosixEnv : public Env {
+ public:
+ PosixEnv();
+
+ ~PosixEnv() override {
+ for (const auto tid : threads_to_join_) {
+ pthread_join(tid, nullptr);
+ }
+ for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
+ thread_pools_[pool_id].JoinAllThreads();
+ }
+ // Delete the thread_status_updater_ only when the current Env is not
+ // Env::Default(). This is to avoid the free-after-use error when
+ // Env::Default() is destructed while some other child threads are
+ // still trying to update thread status.
+ if (this != Env::Default()) {
+ delete thread_status_updater_;
+ }
+ }
+
+ void SetFD_CLOEXEC(int fd, const EnvOptions* options) {
+ if ((options == nullptr || options->set_fd_cloexec) && fd > 0) {
+ fcntl(fd, F_SETFD, fcntl(fd, F_GETFD) | FD_CLOEXEC);
+ }
+ }
+
+ Status NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ int fd = -1;
+ int flags = cloexec_flags(O_RDONLY, &options);
+ FILE* file = nullptr;
+
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef ROCKSDB_LITE
+ return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
+#endif // !ROCKSDB_LITE
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+#endif
+ }
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+ if (fd < 0) {
+ return IOError("While opening a file for sequentially reading", fname,
+ errno);
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ return IOError("While fcntl NoCache", fname, errno);
+ }
+#endif
+ } else {
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ file = fdopen(fd, "r");
+ } while (file == nullptr && errno == EINTR);
+ if (file == nullptr) {
+ close(fd);
+ return IOError("While opening file for sequentially read", fname,
+ errno);
+ }
+ }
+ result->reset(new PosixSequentialFile(fname, file, fd, options));
+ return Status::OK();
+ }
+
+ Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ Status s;
+ int fd;
+ int flags = cloexec_flags(O_RDONLY, &options);
+
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef ROCKSDB_LITE
+ return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
+#endif // !ROCKSDB_LITE
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+ TEST_SYNC_POINT_CALLBACK("NewRandomAccessFile:O_DIRECT", &flags);
+#endif
+ }
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+ if (fd < 0) {
+ return IOError("While open a file for random read", fname, errno);
+ }
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_mmap_reads && sizeof(void*) >= 8) {
+ // Use of mmap for random reads has been removed because it
+ // kills performance when storage is fast.
+ // Use mmap when virtual address-space is plentiful.
+ uint64_t size;
+ s = GetFileSize(fname, &size);
+ if (s.ok()) {
+ void* base = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0);
+ if (base != MAP_FAILED) {
+ result->reset(new PosixMmapReadableFile(fd, fname, base,
+ size, options));
+ } else {
+ s = IOError("while mmap file for read", fname, errno);
+ close(fd);
+ }
+ }
+ } else {
+ if (options.use_direct_reads && !options.use_mmap_reads) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ return IOError("while fcntl NoCache", fname, errno);
+ }
+#endif
+ }
+ result->reset(new PosixRandomAccessFile(fname, fd, options));
+ }
+ return s;
+ }
+
+ virtual Status OpenWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options,
+ bool reopen = false) {
+ result->reset();
+ Status s;
+ int fd = -1;
+ int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
+ // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
+ if (options.use_direct_writes && !options.use_mmap_writes) {
+ // Note: we should avoid O_APPEND here due to ta the following bug:
+ // POSIX requires that opening a file with the O_APPEND flag should
+ // have no affect on the location at which pwrite() writes data.
+ // However, on Linux, if a file is opened with O_APPEND, pwrite()
+ // appends data to the end of the file, regardless of the value of
+ // offset.
+ // More info here: https://linux.die.net/man/2/pwrite
+#ifdef ROCKSDB_LITE
+ return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
+#endif // ROCKSDB_LITE
+ flags |= O_WRONLY;
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+#endif
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
+ } else if (options.use_mmap_writes) {
+ // non-direct I/O
+ flags |= O_RDWR;
+ } else {
+ flags |= O_WRONLY;
+ }
+
+ flags = cloexec_flags(flags, &options);
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+
+ if (fd < 0) {
+ s = IOError("While open a file for appending", fname, errno);
+ return s;
+ }
+ SetFD_CLOEXEC(fd, &options);
+
+ if (options.use_mmap_writes) {
+ if (!checkedDiskForMmap_) {
+ // this will be executed once in the program's lifetime.
+ // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+ if (!SupportsFastAllocate(fname)) {
+ forceMmapOff_ = true;
+ }
+ checkedDiskForMmap_ = true;
+ }
+ }
+ if (options.use_mmap_writes && !forceMmapOff_) {
+ result->reset(new PosixMmapFile(fname, fd, page_size_, options));
+ } else if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ s = IOError("While fcntl NoCache an opened file for appending", fname,
+ errno);
+ return s;
+ }
+#elif defined(OS_SOLARIS)
+ if (directio(fd, DIRECTIO_ON) == -1) {
+ if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
+ close(fd);
+ s = IOError("While calling directio()", fname, errno);
+ return s;
+ }
+ }
+#endif
+ result->reset(new PosixWritableFile(fname, fd, options));
+ } else {
+ // disable mmap writes
+ EnvOptions no_mmap_writes_options = options;
+ no_mmap_writes_options.use_mmap_writes = false;
+ result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
+ }
+ return s;
+ }
+
+ Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ return OpenWritableFile(fname, result, options, false);
+ }
+
+ Status ReopenWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ return OpenWritableFile(fname, result, options, true);
+ }
+
+ Status ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ result->reset();
+ Status s;
+ int fd = -1;
+
+ int flags = 0;
+ // Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
+ if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef ROCKSDB_LITE
+ return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
+#endif // !ROCKSDB_LITE
+ flags |= O_WRONLY;
+#if !defined(OS_MACOSX) && !defined(OS_OPENBSD) && !defined(OS_SOLARIS)
+ flags |= O_DIRECT;
+#endif
+ TEST_SYNC_POINT_CALLBACK("NewWritableFile:O_DIRECT", &flags);
+ } else if (options.use_mmap_writes) {
+ // mmap needs O_RDWR mode
+ flags |= O_RDWR;
+ } else {
+ flags |= O_WRONLY;
+ }
+
+ flags = cloexec_flags(flags, &options);
+
+ do {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(old_fname.c_str(), flags,
+ GetDBFileMode(allow_non_owner_access_));
+ } while (fd < 0 && errno == EINTR);
+ if (fd < 0) {
+ s = IOError("while reopen file for write", fname, errno);
+ return s;
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+ // rename into place
+ if (rename(old_fname.c_str(), fname.c_str()) != 0) {
+ s = IOError("while rename file to " + fname, old_fname, errno);
+ close(fd);
+ return s;
+ }
+
+ if (options.use_mmap_writes) {
+ if (!checkedDiskForMmap_) {
+ // this will be executed once in the program's lifetime.
+ // do not use mmapWrite on non ext-3/xfs/tmpfs systems.
+ if (!SupportsFastAllocate(fname)) {
+ forceMmapOff_ = true;
+ }
+ checkedDiskForMmap_ = true;
+ }
+ }
+ if (options.use_mmap_writes && !forceMmapOff_) {
+ result->reset(new PosixMmapFile(fname, fd, page_size_, options));
+ } else if (options.use_direct_writes && !options.use_mmap_writes) {
+#ifdef OS_MACOSX
+ if (fcntl(fd, F_NOCACHE, 1) == -1) {
+ close(fd);
+ s = IOError("while fcntl NoCache for reopened file for append", fname,
+ errno);
+ return s;
+ }
+#elif defined(OS_SOLARIS)
+ if (directio(fd, DIRECTIO_ON) == -1) {
+ if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
+ close(fd);
+ s = IOError("while calling directio()", fname, errno);
+ return s;
+ }
+ }
+#endif
+ result->reset(new PosixWritableFile(fname, fd, options));
+ } else {
+ // disable mmap writes
+ EnvOptions no_mmap_writes_options = options;
+ no_mmap_writes_options.use_mmap_writes = false;
+ result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
+ }
+ return s;
+ }
+
+ Status NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& options) override {
+ int fd = -1;
+ int flags = cloexec_flags(O_RDWR, &options);
+
+ while (fd < 0) {
+ IOSTATS_TIMER_GUARD(open_nanos);
+
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
+ if (fd < 0) {
+ // Error while opening the file
+ if (errno == EINTR) {
+ continue;
+ }
+ return IOError("While open file for random read/write", fname, errno);
+ }
+ }
+
+ SetFD_CLOEXEC(fd, &options);
+ result->reset(new PosixRandomRWFile(fname, fd, options));
+ return Status::OK();
+ }
+
+ Status NewMemoryMappedFileBuffer(
+ const std::string& fname,
+ std::unique_ptr<MemoryMappedFileBuffer>* result) override {
+ int fd = -1;
+ Status status;
+ int flags = cloexec_flags(O_RDWR, nullptr);
+
+ while (fd < 0) {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, 0644);
+ if (fd < 0) {
+ // Error while opening the file
+ if (errno == EINTR) {
+ continue;
+ }
+ status =
+ IOError("While open file for raw mmap buffer access", fname, errno);
+ break;
+ }
+ }
+ uint64_t size;
+ if (status.ok()) {
+ status = GetFileSize(fname, &size);
+ }
+ void* base = nullptr;
+ if (status.ok()) {
+ base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
+ MAP_SHARED, fd, 0);
+ if (base == MAP_FAILED) {
+ status = IOError("while mmap file for read", fname, errno);
+ }
+ }
+ if (status.ok()) {
+ result->reset(
+ new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
+ }
+ if (fd >= 0) {
+ // don't need to keep it open after mmap has been called
+ close(fd);
+ }
+ return status;
+ }
+
+ Status NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) override {
+ result->reset();
+ int fd;
+ int flags = cloexec_flags(0, nullptr);
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(name.c_str(), flags);
+ }
+ if (fd < 0) {
+ return IOError("While open directory", name, errno);
+ } else {
+ result->reset(new PosixDirectory(fd));
+ }
+ return Status::OK();
+ }
+
+ Status FileExists(const std::string& fname) override {
+ int result = access(fname.c_str(), F_OK);
+
+ if (result == 0) {
+ return Status::OK();
+ }
+
+ int err = errno;
+ switch (err) {
+ case EACCES:
+ case ELOOP:
+ case ENAMETOOLONG:
+ case ENOENT:
+ case ENOTDIR:
+ return Status::NotFound();
+ default:
+ assert(err == EIO || err == ENOMEM);
+ return Status::IOError("Unexpected error(" + ToString(err) +
+ ") accessing file `" + fname + "' ");
+ }
+ }
+
+ Status GetChildren(const std::string& dir,
+ std::vector<std::string>* result) override {
+ result->clear();
+ DIR* d = opendir(dir.c_str());
+ if (d == nullptr) {
+ switch (errno) {
+ case EACCES:
+ case ENOENT:
+ case ENOTDIR:
+ return Status::NotFound();
+ default:
+ return IOError("While opendir", dir, errno);
+ }
+ }
+ struct dirent* entry;
+ while ((entry = readdir(d)) != nullptr) {
+ result->push_back(entry->d_name);
+ }
+ closedir(d);
+ return Status::OK();
+ }
+
+ Status DeleteFile(const std::string& fname) override {
+ Status result;
+ if (unlink(fname.c_str()) != 0) {
+ result = IOError("while unlink() file", fname, errno);
+ }
+ return result;
+ };
+
+ Status CreateDir(const std::string& name) override {
+ Status result;
+ if (mkdir(name.c_str(), 0755) != 0) {
+ result = IOError("While mkdir", name, errno);
+ }
+ return result;
+ };
+
+ Status CreateDirIfMissing(const std::string& name) override {
+ Status result;
+ if (mkdir(name.c_str(), 0755) != 0) {
+ if (errno != EEXIST) {
+ result = IOError("While mkdir if missing", name, errno);
+ } else if (!DirExists(name)) { // Check that name is actually a
+ // directory.
+ // Message is taken from mkdir
+ result = Status::IOError("`"+name+"' exists but is not a directory");
+ }
+ }
+ return result;
+ };
+
+ Status DeleteDir(const std::string& name) override {
+ Status result;
+ if (rmdir(name.c_str()) != 0) {
+ result = IOError("file rmdir", name, errno);
+ }
+ return result;
+ };
+
+ Status GetFileSize(const std::string& fname, uint64_t* size) override {
+ Status s;
+ struct stat sbuf;
+ if (stat(fname.c_str(), &sbuf) != 0) {
+ *size = 0;
+ s = IOError("while stat a file for size", fname, errno);
+ } else {
+ *size = sbuf.st_size;
+ }
+ return s;
+ }
+
+ Status GetFileModificationTime(const std::string& fname,
+ uint64_t* file_mtime) override {
+ struct stat s;
+ if (stat(fname.c_str(), &s) !=0) {
+ return IOError("while stat a file for modification time", fname, errno);
+ }
+ *file_mtime = static_cast<uint64_t>(s.st_mtime);
+ return Status::OK();
+ }
+ Status RenameFile(const std::string& src,
+ const std::string& target) override {
+ Status result;
+ if (rename(src.c_str(), target.c_str()) != 0) {
+ result = IOError("While renaming a file to " + target, src, errno);
+ }
+ return result;
+ }
+
+ Status LinkFile(const std::string& src, const std::string& target) override {
+ Status result;
+ if (link(src.c_str(), target.c_str()) != 0) {
+ if (errno == EXDEV) {
+ return Status::NotSupported("No cross FS links allowed");
+ }
+ result = IOError("while link file to " + target, src, errno);
+ }
+ return result;
+ }
+
+ Status NumFileLinks(const std::string& fname, uint64_t* count) override {
+ struct stat s;
+ if (stat(fname.c_str(), &s) != 0) {
+ return IOError("while stat a file for num file links", fname, errno);
+ }
+ *count = static_cast<uint64_t>(s.st_nlink);
+ return Status::OK();
+ }
+
+ Status AreFilesSame(const std::string& first, const std::string& second,
+ bool* res) override {
+ struct stat statbuf[2];
+ if (stat(first.c_str(), &statbuf[0]) != 0) {
+ return IOError("stat file", first, errno);
+ }
+ if (stat(second.c_str(), &statbuf[1]) != 0) {
+ return IOError("stat file", second, errno);
+ }
+
+ if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) ||
+ minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) ||
+ statbuf[0].st_ino != statbuf[1].st_ino) {
+ *res = false;
+ } else {
+ *res = true;
+ }
+ return Status::OK();
+ }
+
+ Status LockFile(const std::string& fname, FileLock** lock) override {
+ *lock = nullptr;
+ Status result;
+
+ mutex_lockedFiles.Lock();
+ // If it already exists in the lockedFiles set, then it is already locked,
+ // and fail this lock attempt. Otherwise, insert it into lockedFiles.
+ // This check is needed because fcntl() does not detect lock conflict
+ // if the fcntl is issued by the same thread that earlier acquired
+ // this lock.
+ // We must do this check *before* opening the file:
+ // Otherwise, we will open a new file descriptor. Locks are associated with
+ // a process, not a file descriptor and when *any* file descriptor is closed,
+ // all locks the process holds for that *file* are released
+ if (lockedFiles.insert(fname).second == false) {
+ mutex_lockedFiles.Unlock();
+ errno = ENOLCK;
+ return IOError("lock ", fname, errno);
+ }
+
+ int fd;
+ int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr);
+
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, 0644);
+ }
+ if (fd < 0) {
+ result = IOError("while open a file for lock", fname, errno);
+ } else if (LockOrUnlock(fd, true) == -1) {
+ // if there is an error in locking, then remove the pathname from lockedfiles
+ lockedFiles.erase(fname);
+ result = IOError("While lock file", fname, errno);
+ close(fd);
+ } else {
+ SetFD_CLOEXEC(fd, nullptr);
+ PosixFileLock* my_lock = new PosixFileLock;
+ my_lock->fd_ = fd;
+ my_lock->filename = fname;
+ *lock = my_lock;
+ }
+
+ mutex_lockedFiles.Unlock();
+ return result;
+ }
+
+ Status UnlockFile(FileLock* lock) override {
+ PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
+ Status result;
+ mutex_lockedFiles.Lock();
+ // If we are unlocking, then verify that we had locked it earlier,
+ // it should already exist in lockedFiles. Remove it from lockedFiles.
+ if (lockedFiles.erase(my_lock->filename) != 1) {
+ errno = ENOLCK;
+ result = IOError("unlock", my_lock->filename, errno);
+ } else if (LockOrUnlock(my_lock->fd_, false) == -1) {
+ result = IOError("unlock", my_lock->filename, errno);
+ }
+ close(my_lock->fd_);
+ delete my_lock;
+ mutex_lockedFiles.Unlock();
+ return result;
+ }
+
+ void Schedule(void (*function)(void* arg1), void* arg, Priority pri = LOW,
+ void* tag = nullptr,
+ void (*unschedFunction)(void* arg) = nullptr) override;
+
+ int UnSchedule(void* arg, Priority pri) override;
+
+ void StartThread(void (*function)(void* arg), void* arg) override;
+
+ void WaitForJoin() override;
+
+ unsigned int GetThreadPoolQueueLen(Priority pri = LOW) const override;
+
+ Status GetTestDirectory(std::string* result) override {
+ const char* env = getenv("TEST_TMPDIR");
+ if (env && env[0] != '\0') {
+ *result = env;
+ } else {
+ char buf[100];
+ snprintf(buf, sizeof(buf), "/tmp/rocksdbtest-%d", int(geteuid()));
+ *result = buf;
+ }
+ // Directory may already exist
+ CreateDir(*result);
+ return Status::OK();
+ }
+
+ Status GetThreadList(std::vector<ThreadStatus>* thread_list) override {
+ assert(thread_status_updater_);
+ return thread_status_updater_->GetThreadList(thread_list);
+ }
+
+ static uint64_t gettid(pthread_t tid) {
+ uint64_t thread_id = 0;
+ memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid)));
+ return thread_id;
+ }
+
+ static uint64_t gettid() {
+ pthread_t tid = pthread_self();
+ return gettid(tid);
+ }
+
+ uint64_t GetThreadID() const override { return gettid(pthread_self()); }
+
+ Status GetFreeSpace(const std::string& fname, uint64_t* free_space) override {
+ struct statvfs sbuf;
+
+ if (statvfs(fname.c_str(), &sbuf) < 0) {
+ return IOError("While doing statvfs", fname, errno);
+ }
+
+ *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+ return Status::OK();
+ }
+
+ Status NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) override {
+ FILE* f;
+ {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ f = fopen(fname.c_str(), "w"
+#ifdef __GLIBC_PREREQ
+#if __GLIBC_PREREQ(2, 7)
+ "e" // glibc extension to enable O_CLOEXEC
+#endif
+#endif
+ );
+ }
+ if (f == nullptr) {
+ result->reset();
+ return IOError("when fopen a file for new logger", fname, errno);
+ } else {
+ int fd = fileno(f);
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, 4 * 1024);
+#endif
+ SetFD_CLOEXEC(fd, nullptr);
+ result->reset(new PosixLogger(f, &PosixEnv::gettid, this));
+ return Status::OK();
+ }
+ }
+
+ uint64_t NowMicros() override {
+ struct timeval tv;
+ gettimeofday(&tv, nullptr);
+ return static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec;
+ }
+
+ uint64_t NowNanos() override {
+#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX)
+ struct timespec ts;
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
+#elif defined(OS_SOLARIS)
+ return gethrtime();
+#elif defined(__MACH__)
+ clock_serv_t cclock;
+ mach_timespec_t ts;
+ host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
+ clock_get_time(cclock, &ts);
+ mach_port_deallocate(mach_task_self(), cclock);
+ return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
+#else
+ return std::chrono::duration_cast<std::chrono::nanoseconds>(
+ std::chrono::steady_clock::now().time_since_epoch()).count();
+#endif
+ }
+
+ uint64_t NowCPUNanos() override {
+#if defined(OS_LINUX) || defined(OS_FREEBSD) || defined(OS_AIX) || \
+ defined(__MACH__)
+ struct timespec ts;
+ clock_gettime(CLOCK_THREAD_CPUTIME_ID, &ts);
+ return static_cast<uint64_t>(ts.tv_sec) * 1000000000 + ts.tv_nsec;
+#endif
+ return 0;
+ }
+
+ void SleepForMicroseconds(int micros) override { usleep(micros); }
+
+ Status GetHostName(char* name, uint64_t len) override {
+ int ret = gethostname(name, static_cast<size_t>(len));
+ if (ret < 0) {
+ if (errno == EFAULT || errno == EINVAL)
+ return Status::InvalidArgument(strerror(errno));
+ else
+ return IOError("GetHostName", name, errno);
+ }
+ return Status::OK();
+ }
+
+ Status GetCurrentTime(int64_t* unix_time) override {
+ time_t ret = time(nullptr);
+ if (ret == (time_t) -1) {
+ return IOError("GetCurrentTime", "", errno);
+ }
+ *unix_time = (int64_t) ret;
+ return Status::OK();
+ }
+
+ Status GetAbsolutePath(const std::string& db_path,
+ std::string* output_path) override {
+ if (!db_path.empty() && db_path[0] == '/') {
+ *output_path = db_path;
+ return Status::OK();
+ }
+
+ char the_path[256];
+ char* ret = getcwd(the_path, 256);
+ if (ret == nullptr) {
+ return Status::IOError(strerror(errno));
+ }
+
+ *output_path = ret;
+ return Status::OK();
+ }
+
+ // Allow increasing the number of worker threads.
+ void SetBackgroundThreads(int num, Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ thread_pools_[pri].SetBackgroundThreads(num);
+ }
+
+ int GetBackgroundThreads(Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ return thread_pools_[pri].GetBackgroundThreads();
+ }
+
+ Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
+ allow_non_owner_access_ = allow_non_owner_access;
+ return Status::OK();
+ }
+
+ // Allow increasing the number of worker threads.
+ void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
+ }
+
+ void LowerThreadPoolIOPriority(Priority pool = LOW) override {
+ assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
+#ifdef OS_LINUX
+ thread_pools_[pool].LowerIOPriority();
+#else
+ (void)pool;
+#endif
+ }
+
+ void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
+ assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
+#ifdef OS_LINUX
+ thread_pools_[pool].LowerCPUPriority();
+#else
+ (void)pool;
+#endif
+ }
+
+ std::string TimeToString(uint64_t secondsSince1970) override {
+ const time_t seconds = (time_t)secondsSince1970;
+ struct tm t;
+ int maxsize = 64;
+ std::string dummy;
+ dummy.reserve(maxsize);
+ dummy.resize(maxsize);
+ char* p = &dummy[0];
+ localtime_r(&seconds, &t);
+ snprintf(p, maxsize,
+ "%04d/%02d/%02d-%02d:%02d:%02d ",
+ t.tm_year + 1900,
+ t.tm_mon + 1,
+ t.tm_mday,
+ t.tm_hour,
+ t.tm_min,
+ t.tm_sec);
+ return dummy;
+ }
+
+ EnvOptions OptimizeForLogWrite(const EnvOptions& env_options,
+ const DBOptions& db_options) const override {
+ EnvOptions optimized = env_options;
+ optimized.use_mmap_writes = false;
+ optimized.use_direct_writes = false;
+ optimized.bytes_per_sync = db_options.wal_bytes_per_sync;
+ // TODO(icanadi) it's faster if fallocate_with_keep_size is false, but it
+ // breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
+ // test and make this false
+ optimized.fallocate_with_keep_size = true;
+ optimized.writable_file_max_buffer_size =
+ db_options.writable_file_max_buffer_size;
+ return optimized;
+ }
+
+ EnvOptions OptimizeForManifestWrite(
+ const EnvOptions& env_options) const override {
+ EnvOptions optimized = env_options;
+ optimized.use_mmap_writes = false;
+ optimized.use_direct_writes = false;
+ optimized.fallocate_with_keep_size = true;
+ return optimized;
+ }
+
+ private:
+ bool checkedDiskForMmap_;
+ bool forceMmapOff_; // do we override Env options?
+
+ // Returns true iff the named directory exists and is a directory.
+ virtual bool DirExists(const std::string& dname) {
+ struct stat statbuf;
+ if (stat(dname.c_str(), &statbuf) == 0) {
+ return S_ISDIR(statbuf.st_mode);
+ }
+ return false; // stat() failed return false
+ }
+
+ bool SupportsFastAllocate(const std::string& path) {
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ struct statfs s;
+ if (statfs(path.c_str(), &s)){
+ return false;
+ }
+ switch (s.f_type) {
+ case EXT4_SUPER_MAGIC:
+ return true;
+ case XFS_SUPER_MAGIC:
+ return true;
+ case TMPFS_MAGIC:
+ return true;
+ default:
+ return false;
+ }
+#else
+ (void)path;
+ return false;
+#endif
+ }
+
+ size_t page_size_;
+
+ std::vector<ThreadPoolImpl> thread_pools_;
+ pthread_mutex_t mu_;
+ std::vector<pthread_t> threads_to_join_;
+ // If true, allow non owner read access for db files. Otherwise, non-owner
+ // has no access to db files.
+ bool allow_non_owner_access_;
+};
+
+PosixEnv::PosixEnv()
+ : checkedDiskForMmap_(false),
+ forceMmapOff_(false),
+ page_size_(getpagesize()),
+ thread_pools_(Priority::TOTAL),
+ allow_non_owner_access_(true) {
+ ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
+ for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
+ thread_pools_[pool_id].SetThreadPriority(
+ static_cast<Env::Priority>(pool_id));
+ // This allows later initializing the thread-local-env of each thread.
+ thread_pools_[pool_id].SetHostEnv(this);
+ }
+ thread_status_updater_ = CreateThreadStatusUpdater();
+}
+
+void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
+ void* tag, void (*unschedFunction)(void* arg)) {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
+}
+
+int PosixEnv::UnSchedule(void* arg, Priority pri) {
+ return thread_pools_[pri].UnSchedule(arg);
+}
+
+unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ return thread_pools_[pri].GetQueueLen();
+}
+
+struct StartThreadState {
+ void (*user_function)(void*);
+ void* arg;
+};
+
+static void* StartThreadWrapper(void* arg) {
+ StartThreadState* state = reinterpret_cast<StartThreadState*>(arg);
+ state->user_function(state->arg);
+ delete state;
+ return nullptr;
+}
+
+void PosixEnv::StartThread(void (*function)(void* arg), void* arg) {
+ pthread_t t;
+ StartThreadState* state = new StartThreadState;
+ state->user_function = function;
+ state->arg = arg;
+ ThreadPoolImpl::PthreadCall(
+ "start thread", pthread_create(&t, nullptr, &StartThreadWrapper, state));
+ ThreadPoolImpl::PthreadCall("lock", pthread_mutex_lock(&mu_));
+ threads_to_join_.push_back(t);
+ ThreadPoolImpl::PthreadCall("unlock", pthread_mutex_unlock(&mu_));
+}
+
+void PosixEnv::WaitForJoin() {
+ for (const auto tid : threads_to_join_) {
+ pthread_join(tid, nullptr);
+ }
+ threads_to_join_.clear();
+}
+
+} // namespace
+
+std::string Env::GenerateUniqueId() {
+ std::string uuid_file = "/proc/sys/kernel/random/uuid";
+
+ Status s = FileExists(uuid_file);
+ if (s.ok()) {
+ std::string uuid;
+ s = ReadFileToString(this, uuid_file, &uuid);
+ if (s.ok()) {
+ return uuid;
+ }
+ }
+ // Could not read uuid_file - generate uuid using "nanos-random"
+ Random64 r(time(nullptr));
+ uint64_t random_uuid_portion =
+ r.Uniform(std::numeric_limits<uint64_t>::max());
+ uint64_t nanos_uuid_portion = NowNanos();
+ char uuid2[200];
+ snprintf(uuid2,
+ 200,
+ "%lx-%lx",
+ (unsigned long)nanos_uuid_portion,
+ (unsigned long)random_uuid_portion);
+ return uuid2;
+}
+
+//
+// Default Posix Env
+//
+Env* Env::Default() {
+ // The following function call initializes the singletons of ThreadLocalPtr
+ // right before the static default_env. This guarantees default_env will
+ // always being destructed before the ThreadLocalPtr singletons get
+ // destructed as C++ guarantees that the destructions of static variables
+ // is in the reverse order of their constructions.
+ //
+ // Since static members are destructed in the reverse order
+ // of their construction, having this call here guarantees that
+ // the destructor of static PosixEnv will go first, then the
+ // the singletons of ThreadLocalPtr.
+ ThreadLocalPtr::InitSingletons();
+ CompressionContextCache::InitSingleton();
+ INIT_SYNC_POINT_SINGLETONS();
+ static PosixEnv default_env;
+ return &default_env;
+}
+
+} // namespace rocksdb
diff --git a/src/rocksdb/env/env_test.cc b/src/rocksdb/env/env_test.cc
new file mode 100644
index 00000000..47800928
--- /dev/null
+++ b/src/rocksdb/env/env_test.cc
@@ -0,0 +1,1774 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifndef OS_WIN
+#include <sys/ioctl.h>
+#endif
+
+#ifdef ROCKSDB_MALLOC_USABLE_SIZE
+#ifdef OS_FREEBSD
+#include <malloc_np.h>
+#else
+#include <malloc.h>
+#endif
+#endif
+#include <sys/types.h>
+
+#include <iostream>
+#include <unordered_set>
+#include <atomic>
+#include <list>
+
+#ifdef OS_LINUX
+#include <fcntl.h>
+#include <linux/fs.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#endif
+
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+#include <errno.h>
+#endif
+
+#include "env/env_chroot.h"
+#include "port/port.h"
+#include "rocksdb/env.h"
+#include "util/coding.h"
+#include "util/log_buffer.h"
+#include "util/mutexlock.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+#include "util/testharness.h"
+#include "util/testutil.h"
+
+#ifdef OS_LINUX
+static const size_t kPageSize = sysconf(_SC_PAGESIZE);
+#else
+static const size_t kPageSize = 4 * 1024;
+#endif
+
+namespace rocksdb {
+
+static const int kDelayMicros = 100000;
+
+struct Deleter {
+ explicit Deleter(void (*fn)(void*)) : fn_(fn) {}
+
+ void operator()(void* ptr) {
+ assert(fn_);
+ assert(ptr);
+ (*fn_)(ptr);
+ }
+
+ void (*fn_)(void*);
+};
+
+std::unique_ptr<char, Deleter> NewAligned(const size_t size, const char ch) {
+ char* ptr = nullptr;
+#ifdef OS_WIN
+ if (nullptr == (ptr = reinterpret_cast<char*>(_aligned_malloc(size, kPageSize)))) {
+ return std::unique_ptr<char, Deleter>(nullptr, Deleter(_aligned_free));
+ }
+ std::unique_ptr<char, Deleter> uptr(ptr, Deleter(_aligned_free));
+#else
+ if (posix_memalign(reinterpret_cast<void**>(&ptr), kPageSize, size) != 0) {
+ return std::unique_ptr<char, Deleter>(nullptr, Deleter(free));
+ }
+ std::unique_ptr<char, Deleter> uptr(ptr, Deleter(free));
+#endif
+ memset(uptr.get(), ch, size);
+ return uptr;
+}
+
+class EnvPosixTest : public testing::Test {
+ private:
+ port::Mutex mu_;
+ std::string events_;
+
+ public:
+ Env* env_;
+ bool direct_io_;
+ EnvPosixTest() : env_(Env::Default()), direct_io_(false) {}
+};
+
+class EnvPosixTestWithParam
+ : public EnvPosixTest,
+ public ::testing::WithParamInterface<std::pair<Env*, bool>> {
+ public:
+ EnvPosixTestWithParam() {
+ std::pair<Env*, bool> param_pair = GetParam();
+ env_ = param_pair.first;
+ direct_io_ = param_pair.second;
+ }
+
+ void WaitThreadPoolsEmpty() {
+ // Wait until the thread pools are empty.
+ while (env_->GetThreadPoolQueueLen(Env::Priority::LOW) != 0) {
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ }
+ while (env_->GetThreadPoolQueueLen(Env::Priority::HIGH) != 0) {
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ }
+ }
+
+ ~EnvPosixTestWithParam() override { WaitThreadPoolsEmpty(); }
+};
+
+static void SetBool(void* ptr) {
+ reinterpret_cast<std::atomic<bool>*>(ptr)->store(true);
+}
+
+TEST_F(EnvPosixTest, DISABLED_RunImmediately) {
+ for (int pri = Env::BOTTOM; pri < Env::TOTAL; ++pri) {
+ std::atomic<bool> called(false);
+ env_->SetBackgroundThreads(1, static_cast<Env::Priority>(pri));
+ env_->Schedule(&SetBool, &called, static_cast<Env::Priority>(pri));
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(called.load());
+ }
+}
+
+TEST_F(EnvPosixTest, RunEventually) {
+ std::atomic<bool> called(false);
+ env_->StartThread(&SetBool, &called);
+ env_->WaitForJoin();
+ ASSERT_TRUE(called.load());
+}
+
+#ifdef OS_WIN
+TEST_F(EnvPosixTest, AreFilesSame) {
+ {
+ bool tmp;
+ if (env_->AreFilesSame("", "", &tmp).IsNotSupported()) {
+ fprintf(stderr,
+ "skipping EnvBasicTestWithParam.AreFilesSame due to "
+ "unsupported Env::AreFilesSame\n");
+ return;
+ }
+ }
+
+ const EnvOptions soptions;
+ auto* env = Env::Default();
+ std::string same_file_name = test::PerThreadDBPath(env, "same_file");
+ std::string same_file_link_name = same_file_name + "_link";
+
+ std::unique_ptr<WritableFile> same_file;
+ ASSERT_OK(env->NewWritableFile(same_file_name,
+ &same_file, soptions));
+ same_file->Append("random_data");
+ ASSERT_OK(same_file->Flush());
+ same_file.reset();
+
+ ASSERT_OK(env->LinkFile(same_file_name, same_file_link_name));
+ bool result = false;
+ ASSERT_OK(env->AreFilesSame(same_file_name, same_file_link_name, &result));
+ ASSERT_TRUE(result);
+}
+#endif
+
+#ifdef OS_LINUX
+TEST_F(EnvPosixTest, DISABLED_FilePermission) {
+ // Only works for Linux environment
+ if (env_ == Env::Default()) {
+ EnvOptions soptions;
+ std::vector<std::string> fileNames{
+ test::PerThreadDBPath(env_, "testfile"),
+ test::PerThreadDBPath(env_, "testfile1")};
+ std::unique_ptr<WritableFile> wfile;
+ ASSERT_OK(env_->NewWritableFile(fileNames[0], &wfile, soptions));
+ ASSERT_OK(env_->NewWritableFile(fileNames[1], &wfile, soptions));
+ wfile.reset();
+ std::unique_ptr<RandomRWFile> rwfile;
+ ASSERT_OK(env_->NewRandomRWFile(fileNames[1], &rwfile, soptions));
+
+ struct stat sb;
+ for (const auto& filename : fileNames) {
+ if (::stat(filename.c_str(), &sb) == 0) {
+ ASSERT_EQ(sb.st_mode & 0777, 0644);
+ }
+ env_->DeleteFile(filename);
+ }
+
+ env_->SetAllowNonOwnerAccess(false);
+ ASSERT_OK(env_->NewWritableFile(fileNames[0], &wfile, soptions));
+ ASSERT_OK(env_->NewWritableFile(fileNames[1], &wfile, soptions));
+ wfile.reset();
+ ASSERT_OK(env_->NewRandomRWFile(fileNames[1], &rwfile, soptions));
+
+ for (const auto& filename : fileNames) {
+ if (::stat(filename.c_str(), &sb) == 0) {
+ ASSERT_EQ(sb.st_mode & 0777, 0600);
+ }
+ env_->DeleteFile(filename);
+ }
+ }
+}
+#endif
+
+TEST_F(EnvPosixTest, MemoryMappedFileBuffer) {
+ const int kFileBytes = 1 << 15; // 32 KB
+ std::string expected_data;
+ std::string fname = test::PerThreadDBPath(env_, "testfile");
+ {
+ std::unique_ptr<WritableFile> wfile;
+ const EnvOptions soptions;
+ ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
+
+ Random rnd(301);
+ test::RandomString(&rnd, kFileBytes, &expected_data);
+ ASSERT_OK(wfile->Append(expected_data));
+ }
+
+ std::unique_ptr<MemoryMappedFileBuffer> mmap_buffer;
+ Status status = env_->NewMemoryMappedFileBuffer(fname, &mmap_buffer);
+ // it should be supported at least on linux
+#if !defined(OS_LINUX)
+ if (status.IsNotSupported()) {
+ fprintf(stderr,
+ "skipping EnvPosixTest.MemoryMappedFileBuffer due to "
+ "unsupported Env::NewMemoryMappedFileBuffer\n");
+ return;
+ }
+#endif // !defined(OS_LINUX)
+
+ ASSERT_OK(status);
+ ASSERT_NE(nullptr, mmap_buffer.get());
+ ASSERT_NE(nullptr, mmap_buffer->GetBase());
+ ASSERT_EQ(kFileBytes, mmap_buffer->GetLen());
+ std::string actual_data(reinterpret_cast<const char*>(mmap_buffer->GetBase()),
+ mmap_buffer->GetLen());
+ ASSERT_EQ(expected_data, actual_data);
+}
+
+TEST_P(EnvPosixTestWithParam, UnSchedule) {
+ std::atomic<bool> called(false);
+ env_->SetBackgroundThreads(1, Env::LOW);
+
+ /* Block the low priority queue */
+ test::SleepingBackgroundTask sleeping_task, sleeping_task1;
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
+ Env::Priority::LOW);
+
+ /* Schedule another task */
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task1,
+ Env::Priority::LOW, &sleeping_task1);
+
+ /* Remove it with a different tag */
+ ASSERT_EQ(0, env_->UnSchedule(&called, Env::Priority::LOW));
+
+ /* Remove it from the queue with the right tag */
+ ASSERT_EQ(1, env_->UnSchedule(&sleeping_task1, Env::Priority::LOW));
+
+ // Unblock background thread
+ sleeping_task.WakeUp();
+
+ /* Schedule another task */
+ env_->Schedule(&SetBool, &called);
+ for (int i = 0; i < kDelayMicros; i++) {
+ if (called.load()) {
+ break;
+ }
+ Env::Default()->SleepForMicroseconds(1);
+ }
+ ASSERT_TRUE(called.load());
+
+ ASSERT_TRUE(!sleeping_task.IsSleeping() && !sleeping_task1.IsSleeping());
+ WaitThreadPoolsEmpty();
+}
+
+// This tests assumes that the last scheduled
+// task will run last. In fact, in the allotted
+// sleeping time nothing may actually run or they may
+// run in any order. The purpose of the test is unclear.
+#ifndef OS_WIN
+TEST_P(EnvPosixTestWithParam, RunMany) {
+ std::atomic<int> last_id(0);
+
+ struct CB {
+ std::atomic<int>* last_id_ptr; // Pointer to shared slot
+ int id; // Order# for the execution of this callback
+
+ CB(std::atomic<int>* p, int i) : last_id_ptr(p), id(i) {}
+
+ static void Run(void* v) {
+ CB* cb = reinterpret_cast<CB*>(v);
+ int cur = cb->last_id_ptr->load();
+ ASSERT_EQ(cb->id - 1, cur);
+ cb->last_id_ptr->store(cb->id);
+ }
+ };
+
+ // Schedule in different order than start time
+ CB cb1(&last_id, 1);
+ CB cb2(&last_id, 2);
+ CB cb3(&last_id, 3);
+ CB cb4(&last_id, 4);
+ env_->Schedule(&CB::Run, &cb1);
+ env_->Schedule(&CB::Run, &cb2);
+ env_->Schedule(&CB::Run, &cb3);
+ env_->Schedule(&CB::Run, &cb4);
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ int cur = last_id.load(std::memory_order_acquire);
+ ASSERT_EQ(4, cur);
+ WaitThreadPoolsEmpty();
+}
+#endif
+
+struct State {
+ port::Mutex mu;
+ int val;
+ int num_running;
+};
+
+static void ThreadBody(void* arg) {
+ State* s = reinterpret_cast<State*>(arg);
+ s->mu.Lock();
+ s->val += 1;
+ s->num_running -= 1;
+ s->mu.Unlock();
+}
+
+TEST_P(EnvPosixTestWithParam, StartThread) {
+ State state;
+ state.val = 0;
+ state.num_running = 3;
+ for (int i = 0; i < 3; i++) {
+ env_->StartThread(&ThreadBody, &state);
+ }
+ while (true) {
+ state.mu.Lock();
+ int num = state.num_running;
+ state.mu.Unlock();
+ if (num == 0) {
+ break;
+ }
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ }
+ ASSERT_EQ(state.val, 3);
+ WaitThreadPoolsEmpty();
+}
+
+TEST_P(EnvPosixTestWithParam, TwoPools) {
+ // Data structures to signal tasks to run.
+ port::Mutex mutex;
+ port::CondVar cv(&mutex);
+ bool should_start = false;
+
+ class CB {
+ public:
+ CB(const std::string& pool_name, int pool_size, port::Mutex* trigger_mu,
+ port::CondVar* trigger_cv, bool* _should_start)
+ : mu_(),
+ num_running_(0),
+ num_finished_(0),
+ pool_size_(pool_size),
+ pool_name_(pool_name),
+ trigger_mu_(trigger_mu),
+ trigger_cv_(trigger_cv),
+ should_start_(_should_start) {}
+
+ static void Run(void* v) {
+ CB* cb = reinterpret_cast<CB*>(v);
+ cb->Run();
+ }
+
+ void Run() {
+ {
+ MutexLock l(&mu_);
+ num_running_++;
+ // make sure we don't have more than pool_size_ jobs running.
+ ASSERT_LE(num_running_, pool_size_.load());
+ }
+
+ {
+ MutexLock l(trigger_mu_);
+ while (!(*should_start_)) {
+ trigger_cv_->Wait();
+ }
+ }
+
+ {
+ MutexLock l(&mu_);
+ num_running_--;
+ num_finished_++;
+ }
+ }
+
+ int NumFinished() {
+ MutexLock l(&mu_);
+ return num_finished_;
+ }
+
+ void Reset(int pool_size) {
+ pool_size_.store(pool_size);
+ num_finished_ = 0;
+ }
+
+ private:
+ port::Mutex mu_;
+ int num_running_;
+ int num_finished_;
+ std::atomic<int> pool_size_;
+ std::string pool_name_;
+ port::Mutex* trigger_mu_;
+ port::CondVar* trigger_cv_;
+ bool* should_start_;
+ };
+
+ const int kLowPoolSize = 2;
+ const int kHighPoolSize = 4;
+ const int kJobs = 8;
+
+ CB low_pool_job("low", kLowPoolSize, &mutex, &cv, &should_start);
+ CB high_pool_job("high", kHighPoolSize, &mutex, &cv, &should_start);
+
+ env_->SetBackgroundThreads(kLowPoolSize);
+ env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
+
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+ // schedule same number of jobs in each pool
+ for (int i = 0; i < kJobs; i++) {
+ env_->Schedule(&CB::Run, &low_pool_job);
+ env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
+ }
+ // Wait a short while for the jobs to be dispatched.
+ int sleep_count = 0;
+ while ((unsigned int)(kJobs - kLowPoolSize) !=
+ env_->GetThreadPoolQueueLen(Env::Priority::LOW) ||
+ (unsigned int)(kJobs - kHighPoolSize) !=
+ env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) {
+ env_->SleepForMicroseconds(kDelayMicros);
+ if (++sleep_count > 100) {
+ break;
+ }
+ }
+
+ ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
+ env_->GetThreadPoolQueueLen());
+ ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
+ env_->GetThreadPoolQueueLen(Env::Priority::LOW));
+ ASSERT_EQ((unsigned int)(kJobs - kHighPoolSize),
+ env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+ // Trigger jobs to run.
+ {
+ MutexLock l(&mutex);
+ should_start = true;
+ cv.SignalAll();
+ }
+
+ // wait for all jobs to finish
+ while (low_pool_job.NumFinished() < kJobs ||
+ high_pool_job.NumFinished() < kJobs) {
+ env_->SleepForMicroseconds(kDelayMicros);
+ }
+
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::LOW));
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+ // Hold jobs to schedule;
+ should_start = false;
+
+ // call IncBackgroundThreadsIfNeeded to two pools. One increasing and
+ // the other decreasing
+ env_->IncBackgroundThreadsIfNeeded(kLowPoolSize - 1, Env::Priority::LOW);
+ env_->IncBackgroundThreadsIfNeeded(kHighPoolSize + 1, Env::Priority::HIGH);
+ high_pool_job.Reset(kHighPoolSize + 1);
+ low_pool_job.Reset(kLowPoolSize);
+
+ // schedule same number of jobs in each pool
+ for (int i = 0; i < kJobs; i++) {
+ env_->Schedule(&CB::Run, &low_pool_job);
+ env_->Schedule(&CB::Run, &high_pool_job, Env::Priority::HIGH);
+ }
+ // Wait a short while for the jobs to be dispatched.
+ sleep_count = 0;
+ while ((unsigned int)(kJobs - kLowPoolSize) !=
+ env_->GetThreadPoolQueueLen(Env::Priority::LOW) ||
+ (unsigned int)(kJobs - (kHighPoolSize + 1)) !=
+ env_->GetThreadPoolQueueLen(Env::Priority::HIGH)) {
+ env_->SleepForMicroseconds(kDelayMicros);
+ if (++sleep_count > 100) {
+ break;
+ }
+ }
+ ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
+ env_->GetThreadPoolQueueLen());
+ ASSERT_EQ((unsigned int)(kJobs - kLowPoolSize),
+ env_->GetThreadPoolQueueLen(Env::Priority::LOW));
+ ASSERT_EQ((unsigned int)(kJobs - (kHighPoolSize + 1)),
+ env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+ // Trigger jobs to run.
+ {
+ MutexLock l(&mutex);
+ should_start = true;
+ cv.SignalAll();
+ }
+
+ // wait for all jobs to finish
+ while (low_pool_job.NumFinished() < kJobs ||
+ high_pool_job.NumFinished() < kJobs) {
+ env_->SleepForMicroseconds(kDelayMicros);
+ }
+
+ env_->SetBackgroundThreads(kHighPoolSize, Env::Priority::HIGH);
+ WaitThreadPoolsEmpty();
+}
+
+TEST_P(EnvPosixTestWithParam, DecreaseNumBgThreads) {
+ std::vector<test::SleepingBackgroundTask> tasks(10);
+
+ // Set number of thread to 1 first.
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+
+ // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
+ for (size_t i = 0; i < 3; i++) {
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
+ Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ }
+ ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(!tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // Increase to 2 threads. Task 0, 1 running; 2 waiting
+ env_->SetBackgroundThreads(2, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // The last task finishes. Task 0 running, 2 waiting.
+ tasks[1].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(1U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(!tasks[1].IsSleeping());
+ ASSERT_TRUE(!tasks[2].IsSleeping());
+
+ // Increase to 5 threads. Task 0 and 2 running.
+ env_->SetBackgroundThreads(5, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[0].IsSleeping());
+ ASSERT_TRUE(tasks[2].IsSleeping());
+
+ // Change number of threads a couple of times while there is no sufficient
+ // tasks.
+ env_->SetBackgroundThreads(7, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ tasks[2].WakeUp();
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(3, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(5, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(0U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros * 50);
+
+ // Enqueue 5 more tasks. Thread pool size now is 4.
+ // Task 0, 3, 4, 5 running;6, 7 waiting.
+ for (size_t i = 3; i < 8; i++) {
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[i],
+ Env::Priority::HIGH);
+ }
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ(2U, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[3].IsSleeping());
+ ASSERT_TRUE(tasks[4].IsSleeping());
+ ASSERT_TRUE(tasks[5].IsSleeping());
+ ASSERT_TRUE(!tasks[6].IsSleeping());
+ ASSERT_TRUE(!tasks[7].IsSleeping());
+
+ // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
+ tasks[0].WakeUp();
+ tasks[3].WakeUp();
+ tasks[4].WakeUp();
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ for (size_t i = 5; i < 8; i++) {
+ ASSERT_TRUE(tasks[i].IsSleeping());
+ }
+
+ // Shrink back to 1 thread. Still task 5, 6, 7 running
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(tasks[5].IsSleeping());
+ ASSERT_TRUE(tasks[6].IsSleeping());
+ ASSERT_TRUE(tasks[7].IsSleeping());
+
+ // Wake up task 6. Task 5, 7 running
+ tasks[6].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(tasks[5].IsSleeping());
+ ASSERT_TRUE(!tasks[6].IsSleeping());
+ ASSERT_TRUE(tasks[7].IsSleeping());
+
+ // Wake up threads 7. Task 5 running
+ tasks[7].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[7].IsSleeping());
+
+ // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[8],
+ Env::Priority::HIGH);
+ env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &tasks[9],
+ Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_GT(env_->GetThreadPoolQueueLen(Env::Priority::HIGH), (unsigned int)0);
+ ASSERT_TRUE(!tasks[8].IsSleeping() || !tasks[9].IsSleeping());
+
+ // Increase to 4 threads. Task 5, 8, 9 running.
+ env_->SetBackgroundThreads(4, Env::Priority::HIGH);
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_EQ((unsigned int)0, env_->GetThreadPoolQueueLen(Env::Priority::HIGH));
+ ASSERT_TRUE(tasks[8].IsSleeping());
+ ASSERT_TRUE(tasks[9].IsSleeping());
+
+ // Shrink to 1 thread
+ env_->SetBackgroundThreads(1, Env::Priority::HIGH);
+
+ // Wake up thread 9.
+ tasks[9].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[9].IsSleeping());
+ ASSERT_TRUE(tasks[8].IsSleeping());
+
+ // Wake up thread 8
+ tasks[8].WakeUp();
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[8].IsSleeping());
+
+ // Wake up the last thread
+ tasks[5].WakeUp();
+
+ Env::Default()->SleepForMicroseconds(kDelayMicros);
+ ASSERT_TRUE(!tasks[5].IsSleeping());
+ WaitThreadPoolsEmpty();
+}
+
+#if (defined OS_LINUX || defined OS_WIN)
+// Travis doesn't support fallocate or getting unique ID from files for whatever
+// reason.
+#ifndef TRAVIS
+
+namespace {
+bool IsSingleVarint(const std::string& s) {
+ Slice slice(s);
+
+ uint64_t v;
+ if (!GetVarint64(&slice, &v)) {
+ return false;
+ }
+
+ return slice.size() == 0;
+}
+
+bool IsUniqueIDValid(const std::string& s) {
+ return !s.empty() && !IsSingleVarint(s);
+}
+
+const size_t MAX_ID_SIZE = 100;
+char temp_id[MAX_ID_SIZE];
+
+
+} // namespace
+
+// Determine whether we can use the FS_IOC_GETVERSION ioctl
+// on a file in directory DIR. Create a temporary file therein,
+// try to apply the ioctl (save that result), cleanup and
+// return the result. Return true if it is supported, and
+// false if anything fails.
+// Note that this function "knows" that dir has just been created
+// and is empty, so we create a simply-named test file: "f".
+bool ioctl_support__FS_IOC_GETVERSION(const std::string& dir) {
+#ifdef OS_WIN
+ return true;
+#else
+ const std::string file = dir + "/f";
+ int fd;
+ do {
+ fd = open(file.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ } while (fd < 0 && errno == EINTR);
+ long int version;
+ bool ok = (fd >= 0 && ioctl(fd, FS_IOC_GETVERSION, &version) >= 0);
+
+ close(fd);
+ unlink(file.c_str());
+
+ return ok;
+#endif
+}
+
+// To ensure that Env::GetUniqueId-related tests work correctly, the files
+// should be stored in regular storage like "hard disk" or "flash device",
+// and not on a tmpfs file system (like /dev/shm and /tmp on some systems).
+// Otherwise we cannot get the correct id.
+//
+// This function serves as the replacement for test::TmpDir(), which may be
+// customized to be on a file system that doesn't work with GetUniqueId().
+
+class IoctlFriendlyTmpdir {
+ public:
+ explicit IoctlFriendlyTmpdir() {
+ char dir_buf[100];
+
+ const char *fmt = "%s/rocksdb.XXXXXX";
+ const char *tmp = getenv("TEST_IOCTL_FRIENDLY_TMPDIR");
+
+#ifdef OS_WIN
+#define rmdir _rmdir
+ if(tmp == nullptr) {
+ tmp = getenv("TMP");
+ }
+
+ snprintf(dir_buf, sizeof dir_buf, fmt, tmp);
+ auto result = _mktemp(dir_buf);
+ assert(result != nullptr);
+ BOOL ret = CreateDirectory(dir_buf, NULL);
+ assert(ret == TRUE);
+ dir_ = dir_buf;
+#else
+ std::list<std::string> candidate_dir_list = {"/var/tmp", "/tmp"};
+
+ // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use
+ // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and
+ // add 1 for the trailing NUL byte.
+ if (tmp && strlen(tmp) + strlen(fmt) - 2 + 1 <= sizeof dir_buf) {
+ // use $TEST_IOCTL_FRIENDLY_TMPDIR value
+ candidate_dir_list.push_front(tmp);
+ }
+
+ for (const std::string& d : candidate_dir_list) {
+ snprintf(dir_buf, sizeof dir_buf, fmt, d.c_str());
+ if (mkdtemp(dir_buf)) {
+ if (ioctl_support__FS_IOC_GETVERSION(dir_buf)) {
+ dir_ = dir_buf;
+ return;
+ } else {
+ // Diagnose ioctl-related failure only if this is the
+ // directory specified via that envvar.
+ if (tmp && tmp == d) {
+ fprintf(stderr, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is "
+ "not suitable: %s\n", d.c_str());
+ }
+ rmdir(dir_buf); // ignore failure
+ }
+ } else {
+ // mkdtemp failed: diagnose it, but don't give up.
+ fprintf(stderr, "mkdtemp(%s/...) failed: %s\n", d.c_str(),
+ strerror(errno));
+ }
+ }
+
+ fprintf(stderr, "failed to find an ioctl-friendly temporary directory;"
+ " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n");
+ std::abort();
+#endif
+}
+
+ ~IoctlFriendlyTmpdir() {
+ rmdir(dir_.c_str());
+ }
+
+ const std::string& name() const {
+ return dir_;
+ }
+
+ private:
+ std::string dir_;
+};
+
+#ifndef ROCKSDB_LITE
+TEST_F(EnvPosixTest, PositionedAppend) {
+ std::unique_ptr<WritableFile> writable_file;
+ EnvOptions options;
+ options.use_direct_writes = true;
+ options.use_mmap_writes = false;
+ IoctlFriendlyTmpdir ift;
+ ASSERT_OK(env_->NewWritableFile(ift.name() + "/f", &writable_file, options));
+ const size_t kBlockSize = 4096;
+ const size_t kDataSize = kPageSize;
+ // Write a page worth of 'a'
+ auto data_ptr = NewAligned(kDataSize, 'a');
+ Slice data_a(data_ptr.get(), kDataSize);
+ ASSERT_OK(writable_file->PositionedAppend(data_a, 0U));
+ // Write a page worth of 'b' right after the first sector
+ data_ptr = NewAligned(kDataSize, 'b');
+ Slice data_b(data_ptr.get(), kDataSize);
+ ASSERT_OK(writable_file->PositionedAppend(data_b, kBlockSize));
+ ASSERT_OK(writable_file->Close());
+ // The file now has 1 sector worth of a followed by a page worth of b
+
+ // Verify the above
+ std::unique_ptr<SequentialFile> seq_file;
+ ASSERT_OK(env_->NewSequentialFile(ift.name() + "/f", &seq_file, options));
+ char scratch[kPageSize * 2];
+ Slice result;
+ ASSERT_OK(seq_file->Read(sizeof(scratch), &result, scratch));
+ ASSERT_EQ(kPageSize + kBlockSize, result.size());
+ ASSERT_EQ('a', result[kBlockSize - 1]);
+ ASSERT_EQ('b', result[kBlockSize]);
+}
+#endif // !ROCKSDB_LITE
+
+// Only works in linux platforms
+TEST_P(EnvPosixTestWithParam, RandomAccessUniqueID) {
+ // Create file.
+ if (env_ == Env::Default()) {
+ EnvOptions soptions;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+ IoctlFriendlyTmpdir ift;
+ std::string fname = ift.name() + "/testfile";
+ std::unique_ptr<WritableFile> wfile;
+ ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
+
+ std::unique_ptr<RandomAccessFile> file;
+
+ // Get Unique ID
+ ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
+ size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
+ ASSERT_TRUE(id_size > 0);
+ std::string unique_id1(temp_id, id_size);
+ ASSERT_TRUE(IsUniqueIDValid(unique_id1));
+
+ // Get Unique ID again
+ ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
+ id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
+ ASSERT_TRUE(id_size > 0);
+ std::string unique_id2(temp_id, id_size);
+ ASSERT_TRUE(IsUniqueIDValid(unique_id2));
+
+ // Get Unique ID again after waiting some time.
+ env_->SleepForMicroseconds(1000000);
+ ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
+ id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
+ ASSERT_TRUE(id_size > 0);
+ std::string unique_id3(temp_id, id_size);
+ ASSERT_TRUE(IsUniqueIDValid(unique_id3));
+
+ // Check IDs are the same.
+ ASSERT_EQ(unique_id1, unique_id2);
+ ASSERT_EQ(unique_id2, unique_id3);
+
+ // Delete the file
+ env_->DeleteFile(fname);
+ }
+}
+
+// only works in linux platforms
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+TEST_P(EnvPosixTestWithParam, AllocateTest) {
+ if (env_ == Env::Default()) {
+ IoctlFriendlyTmpdir ift;
+ std::string fname = ift.name() + "/preallocate_testfile";
+
+ // Try fallocate in a file to see whether the target file system supports
+ // it.
+ // Skip the test if fallocate is not supported.
+ std::string fname_test_fallocate = ift.name() + "/preallocate_testfile_2";
+ int fd = -1;
+ do {
+ fd = open(fname_test_fallocate.c_str(), O_CREAT | O_RDWR | O_TRUNC, 0644);
+ } while (fd < 0 && errno == EINTR);
+ ASSERT_GT(fd, 0);
+
+ int alloc_status = fallocate(fd, 0, 0, 1);
+
+ int err_number = 0;
+ if (alloc_status != 0) {
+ err_number = errno;
+ fprintf(stderr, "Warning: fallocate() fails, %s\n", strerror(err_number));
+ }
+ close(fd);
+ ASSERT_OK(env_->DeleteFile(fname_test_fallocate));
+ if (alloc_status != 0 && err_number == EOPNOTSUPP) {
+ // The filesystem containing the file does not support fallocate
+ return;
+ }
+
+ EnvOptions soptions;
+ soptions.use_mmap_writes = false;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+ std::unique_ptr<WritableFile> wfile;
+ ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
+
+ // allocate 100 MB
+ size_t kPreallocateSize = 100 * 1024 * 1024;
+ size_t kBlockSize = 512;
+ size_t kPageSize = 4096;
+ size_t kDataSize = 1024 * 1024;
+ auto data_ptr = NewAligned(kDataSize, 'A');
+ Slice data(data_ptr.get(), kDataSize);
+ wfile->SetPreallocationBlockSize(kPreallocateSize);
+ wfile->PrepareWrite(wfile->GetFileSize(), kDataSize);
+ ASSERT_OK(wfile->Append(data));
+ ASSERT_OK(wfile->Flush());
+
+ struct stat f_stat;
+ ASSERT_EQ(stat(fname.c_str(), &f_stat), 0);
+ ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
+ // verify that blocks are preallocated
+ // Note here that we don't check the exact number of blocks preallocated --
+ // we only require that number of allocated blocks is at least what we
+ // expect.
+ // It looks like some FS give us more blocks that we asked for. That's fine.
+ // It might be worth investigating further.
+ ASSERT_LE((unsigned int)(kPreallocateSize / kBlockSize), f_stat.st_blocks);
+
+ // close the file, should deallocate the blocks
+ wfile.reset();
+
+ stat(fname.c_str(), &f_stat);
+ ASSERT_EQ((unsigned int)kDataSize, f_stat.st_size);
+ // verify that preallocated blocks were deallocated on file close
+ // Because the FS might give us more blocks, we add a full page to the size
+ // and expect the number of blocks to be less or equal to that.
+ ASSERT_GE((f_stat.st_size + kPageSize + kBlockSize - 1) / kBlockSize,
+ (unsigned int)f_stat.st_blocks);
+ }
+}
+#endif // ROCKSDB_FALLOCATE_PRESENT
+
+// Returns true if any of the strings in ss are the prefix of another string.
+bool HasPrefix(const std::unordered_set<std::string>& ss) {
+ for (const std::string& s: ss) {
+ if (s.empty()) {
+ return true;
+ }
+ for (size_t i = 1; i < s.size(); ++i) {
+ if (ss.count(s.substr(0, i)) != 0) {
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+// Only works in linux and WIN platforms
+TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDConcurrent) {
+ if (env_ == Env::Default()) {
+ // Check whether a bunch of concurrently existing files have unique IDs.
+ EnvOptions soptions;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+
+ // Create the files
+ IoctlFriendlyTmpdir ift;
+ std::vector<std::string> fnames;
+ for (int i = 0; i < 1000; ++i) {
+ fnames.push_back(ift.name() + "/" + "testfile" + ToString(i));
+
+ // Create file.
+ std::unique_ptr<WritableFile> wfile;
+ ASSERT_OK(env_->NewWritableFile(fnames[i], &wfile, soptions));
+ }
+
+ // Collect and check whether the IDs are unique.
+ std::unordered_set<std::string> ids;
+ for (const std::string fname : fnames) {
+ std::unique_ptr<RandomAccessFile> file;
+ std::string unique_id;
+ ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
+ size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
+ ASSERT_TRUE(id_size > 0);
+ unique_id = std::string(temp_id, id_size);
+ ASSERT_TRUE(IsUniqueIDValid(unique_id));
+
+ ASSERT_TRUE(ids.count(unique_id) == 0);
+ ids.insert(unique_id);
+ }
+
+ // Delete the files
+ for (const std::string fname : fnames) {
+ ASSERT_OK(env_->DeleteFile(fname));
+ }
+
+ ASSERT_TRUE(!HasPrefix(ids));
+ }
+}
+
+// Only works in linux and WIN platforms
+TEST_P(EnvPosixTestWithParam, RandomAccessUniqueIDDeletes) {
+ if (env_ == Env::Default()) {
+ EnvOptions soptions;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+
+ IoctlFriendlyTmpdir ift;
+ std::string fname = ift.name() + "/" + "testfile";
+
+ // Check that after file is deleted we don't get same ID again in a new
+ // file.
+ std::unordered_set<std::string> ids;
+ for (int i = 0; i < 1000; ++i) {
+ // Create file.
+ {
+ std::unique_ptr<WritableFile> wfile;
+ ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
+ }
+
+ // Get Unique ID
+ std::string unique_id;
+ {
+ std::unique_ptr<RandomAccessFile> file;
+ ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
+ size_t id_size = file->GetUniqueId(temp_id, MAX_ID_SIZE);
+ ASSERT_TRUE(id_size > 0);
+ unique_id = std::string(temp_id, id_size);
+ }
+
+ ASSERT_TRUE(IsUniqueIDValid(unique_id));
+ ASSERT_TRUE(ids.count(unique_id) == 0);
+ ids.insert(unique_id);
+
+ // Delete the file
+ ASSERT_OK(env_->DeleteFile(fname));
+ }
+
+ ASSERT_TRUE(!HasPrefix(ids));
+ }
+}
+
+// Only works in linux platforms
+#ifdef OS_WIN
+TEST_P(EnvPosixTestWithParam, DISABLED_InvalidateCache) {
+#else
+TEST_P(EnvPosixTestWithParam, InvalidateCache) {
+#endif
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ EnvOptions soptions;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+ std::string fname = test::PerThreadDBPath(env_, "testfile");
+
+ const size_t kSectorSize = 512;
+ auto data = NewAligned(kSectorSize, 0);
+ Slice slice(data.get(), kSectorSize);
+
+ // Create file.
+ {
+ std::unique_ptr<WritableFile> wfile;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
+ if (soptions.use_direct_writes) {
+ soptions.use_direct_writes = false;
+ }
+#endif
+ ASSERT_OK(env_->NewWritableFile(fname, &wfile, soptions));
+ ASSERT_OK(wfile->Append(slice));
+ ASSERT_OK(wfile->InvalidateCache(0, 0));
+ ASSERT_OK(wfile->Close());
+ }
+
+ // Random Read
+ {
+ std::unique_ptr<RandomAccessFile> file;
+ auto scratch = NewAligned(kSectorSize, 0);
+ Slice result;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
+ if (soptions.use_direct_reads) {
+ soptions.use_direct_reads = false;
+ }
+#endif
+ ASSERT_OK(env_->NewRandomAccessFile(fname, &file, soptions));
+ ASSERT_OK(file->Read(0, kSectorSize, &result, scratch.get()));
+ ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
+ ASSERT_OK(file->InvalidateCache(0, 11));
+ ASSERT_OK(file->InvalidateCache(0, 0));
+ }
+
+ // Sequential Read
+ {
+ std::unique_ptr<SequentialFile> file;
+ auto scratch = NewAligned(kSectorSize, 0);
+ Slice result;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
+ if (soptions.use_direct_reads) {
+ soptions.use_direct_reads = false;
+ }
+#endif
+ ASSERT_OK(env_->NewSequentialFile(fname, &file, soptions));
+ if (file->use_direct_io()) {
+ ASSERT_OK(file->PositionedRead(0, kSectorSize, &result, scratch.get()));
+ } else {
+ ASSERT_OK(file->Read(kSectorSize, &result, scratch.get()));
+ }
+ ASSERT_EQ(memcmp(scratch.get(), data.get(), kSectorSize), 0);
+ ASSERT_OK(file->InvalidateCache(0, 11));
+ ASSERT_OK(file->InvalidateCache(0, 0));
+ }
+ // Delete the file
+ ASSERT_OK(env_->DeleteFile(fname));
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+}
+#endif // not TRAVIS
+#endif // OS_LINUX || OS_WIN
+
+class TestLogger : public Logger {
+ public:
+ using Logger::Logv;
+ void Logv(const char* format, va_list ap) override {
+ log_count++;
+
+ char new_format[550];
+ std::fill_n(new_format, sizeof(new_format), '2');
+ {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
+ // 48 bytes for extra information + bytes allocated
+
+// When we have n == -1 there is not a terminating zero expected
+#ifdef OS_WIN
+ if (n < 0) {
+ char_0_count++;
+ }
+#endif
+
+ if (new_format[0] == '[') {
+ // "[DEBUG] "
+ ASSERT_TRUE(n <= 56 + (512 - static_cast<int>(sizeof(struct timeval))));
+ } else {
+ ASSERT_TRUE(n <= 48 + (512 - static_cast<int>(sizeof(struct timeval))));
+ }
+ va_end(backup_ap);
+ }
+
+ for (size_t i = 0; i < sizeof(new_format); i++) {
+ if (new_format[i] == 'x') {
+ char_x_count++;
+ } else if (new_format[i] == '\0') {
+ char_0_count++;
+ }
+ }
+ }
+ int log_count;
+ int char_x_count;
+ int char_0_count;
+};
+
+TEST_P(EnvPosixTestWithParam, LogBufferTest) {
+ TestLogger test_logger;
+ test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
+ test_logger.log_count = 0;
+ test_logger.char_x_count = 0;
+ test_logger.char_0_count = 0;
+ LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
+ LogBuffer log_buffer_debug(DEBUG_LEVEL, &test_logger);
+
+ char bytes200[200];
+ std::fill_n(bytes200, sizeof(bytes200), '1');
+ bytes200[sizeof(bytes200) - 1] = '\0';
+ char bytes600[600];
+ std::fill_n(bytes600, sizeof(bytes600), '1');
+ bytes600[sizeof(bytes600) - 1] = '\0';
+ char bytes9000[9000];
+ std::fill_n(bytes9000, sizeof(bytes9000), '1');
+ bytes9000[sizeof(bytes9000) - 1] = '\0';
+
+ ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes200);
+ ROCKS_LOG_BUFFER(&log_buffer, "x%sx", bytes600);
+ ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx%sx", bytes200, bytes200, bytes200);
+ ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes200, bytes600);
+ ROCKS_LOG_BUFFER(&log_buffer, "x%sx%sx", bytes600, bytes9000);
+
+ ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx", bytes200);
+ test_logger.SetInfoLogLevel(DEBUG_LEVEL);
+ ROCKS_LOG_BUFFER(&log_buffer_debug, "x%sx%sx%sx", bytes600, bytes9000,
+ bytes200);
+
+ ASSERT_EQ(0, test_logger.log_count);
+ log_buffer.FlushBufferToLog();
+ log_buffer_debug.FlushBufferToLog();
+ ASSERT_EQ(6, test_logger.log_count);
+ ASSERT_EQ(6, test_logger.char_0_count);
+ ASSERT_EQ(10, test_logger.char_x_count);
+}
+
+class TestLogger2 : public Logger {
+ public:
+ explicit TestLogger2(size_t max_log_size) : max_log_size_(max_log_size) {}
+ using Logger::Logv;
+ void Logv(const char* format, va_list ap) override {
+ char new_format[2000];
+ std::fill_n(new_format, sizeof(new_format), '2');
+ {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ int n = vsnprintf(new_format, sizeof(new_format) - 1, format, backup_ap);
+ // 48 bytes for extra information + bytes allocated
+ ASSERT_TRUE(
+ n <= 48 + static_cast<int>(max_log_size_ - sizeof(struct timeval)));
+ ASSERT_TRUE(n > static_cast<int>(max_log_size_ - sizeof(struct timeval)));
+ va_end(backup_ap);
+ }
+ }
+ size_t max_log_size_;
+};
+
+TEST_P(EnvPosixTestWithParam, LogBufferMaxSizeTest) {
+ char bytes9000[9000];
+ std::fill_n(bytes9000, sizeof(bytes9000), '1');
+ bytes9000[sizeof(bytes9000) - 1] = '\0';
+
+ for (size_t max_log_size = 256; max_log_size <= 1024;
+ max_log_size += 1024 - 256) {
+ TestLogger2 test_logger(max_log_size);
+ test_logger.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL);
+ LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, &test_logger);
+ ROCKS_LOG_BUFFER_MAX_SZ(&log_buffer, max_log_size, "%s", bytes9000);
+ log_buffer.FlushBufferToLog();
+ }
+}
+
+TEST_P(EnvPosixTestWithParam, Preallocation) {
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ const std::string src = test::PerThreadDBPath(env_, "testfile");
+ std::unique_ptr<WritableFile> srcfile;
+ EnvOptions soptions;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
+ if (soptions.use_direct_writes) {
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "NewWritableFile:O_DIRECT", [&](void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ });
+ }
+#endif
+ ASSERT_OK(env_->NewWritableFile(src, &srcfile, soptions));
+ srcfile->SetPreallocationBlockSize(1024 * 1024);
+
+ // No writes should mean no preallocation
+ size_t block_size, last_allocated_block;
+ srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
+ ASSERT_EQ(last_allocated_block, 0UL);
+
+ // Small write should preallocate one block
+ size_t kStrSize = 4096;
+ auto data = NewAligned(kStrSize, 'A');
+ Slice str(data.get(), kStrSize);
+ srcfile->PrepareWrite(srcfile->GetFileSize(), kStrSize);
+ srcfile->Append(str);
+ srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
+ ASSERT_EQ(last_allocated_block, 1UL);
+
+ // Write an entire preallocation block, make sure we increased by two.
+ {
+ auto buf_ptr = NewAligned(block_size, ' ');
+ Slice buf(buf_ptr.get(), block_size);
+ srcfile->PrepareWrite(srcfile->GetFileSize(), block_size);
+ srcfile->Append(buf);
+ srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
+ ASSERT_EQ(last_allocated_block, 2UL);
+ }
+
+ // Write five more blocks at once, ensure we're where we need to be.
+ {
+ auto buf_ptr = NewAligned(block_size * 5, ' ');
+ Slice buf = Slice(buf_ptr.get(), block_size * 5);
+ srcfile->PrepareWrite(srcfile->GetFileSize(), buf.size());
+ srcfile->Append(buf);
+ srcfile->GetPreallocationStatus(&block_size, &last_allocated_block);
+ ASSERT_EQ(last_allocated_block, 7UL);
+ }
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+}
+
+// Test that the two ways to get children file attributes (in bulk or
+// individually) behave consistently.
+TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) {
+ rocksdb::SyncPoint::GetInstance()->EnableProcessing();
+ EnvOptions soptions;
+ soptions.use_direct_reads = soptions.use_direct_writes = direct_io_;
+ const int kNumChildren = 10;
+
+ std::string data;
+ for (int i = 0; i < kNumChildren; ++i) {
+ const std::string path =
+ test::TmpDir(env_) + "/" + "testfile_" + std::to_string(i);
+ std::unique_ptr<WritableFile> file;
+#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX) && !defined(OS_OPENBSD) && !defined(OS_FREEBSD)
+ if (soptions.use_direct_writes) {
+ rocksdb::SyncPoint::GetInstance()->SetCallBack(
+ "NewWritableFile:O_DIRECT", [&](void* arg) {
+ int* val = static_cast<int*>(arg);
+ *val &= ~O_DIRECT;
+ });
+ }
+#endif
+ ASSERT_OK(env_->NewWritableFile(path, &file, soptions));
+ auto buf_ptr = NewAligned(data.size(), 'T');
+ Slice buf(buf_ptr.get(), data.size());
+ file->Append(buf);
+ data.append(std::string(4096, 'T'));
+ }
+
+ std::vector<Env::FileAttributes> file_attrs;
+ ASSERT_OK(env_->GetChildrenFileAttributes(test::TmpDir(env_), &file_attrs));
+ for (int i = 0; i < kNumChildren; ++i) {
+ const std::string name = "testfile_" + std::to_string(i);
+ const std::string path = test::TmpDir(env_) + "/" + name;
+
+ auto file_attrs_iter = std::find_if(
+ file_attrs.begin(), file_attrs.end(),
+ [&name](const Env::FileAttributes& fm) { return fm.name == name; });
+ ASSERT_TRUE(file_attrs_iter != file_attrs.end());
+ uint64_t size;
+ ASSERT_OK(env_->GetFileSize(path, &size));
+ ASSERT_EQ(size, 4096 * i);
+ ASSERT_EQ(size, file_attrs_iter->size_bytes);
+ }
+ rocksdb::SyncPoint::GetInstance()->ClearTrace();
+}
+
+// Test that all WritableFileWrapper forwards all calls to WritableFile.
+TEST_P(EnvPosixTestWithParam, WritableFileWrapper) {
+ class Base : public WritableFile {
+ public:
+ mutable int *step_;
+
+ void inc(int x) const {
+ EXPECT_EQ(x, (*step_)++);
+ }
+
+ explicit Base(int* step) : step_(step) {
+ inc(0);
+ }
+
+ Status Append(const Slice& /*data*/) override {
+ inc(1);
+ return Status::OK();
+ }
+
+ Status PositionedAppend(const Slice& /*data*/,
+ uint64_t /*offset*/) override {
+ inc(2);
+ return Status::OK();
+ }
+
+ Status Truncate(uint64_t /*size*/) override {
+ inc(3);
+ return Status::OK();
+ }
+
+ Status Close() override {
+ inc(4);
+ return Status::OK();
+ }
+
+ Status Flush() override {
+ inc(5);
+ return Status::OK();
+ }
+
+ Status Sync() override {
+ inc(6);
+ return Status::OK();
+ }
+
+ Status Fsync() override {
+ inc(7);
+ return Status::OK();
+ }
+
+ bool IsSyncThreadSafe() const override {
+ inc(8);
+ return true;
+ }
+
+ bool use_direct_io() const override {
+ inc(9);
+ return true;
+ }
+
+ size_t GetRequiredBufferAlignment() const override {
+ inc(10);
+ return 0;
+ }
+
+ void SetIOPriority(Env::IOPriority /*pri*/) override { inc(11); }
+
+ Env::IOPriority GetIOPriority() override {
+ inc(12);
+ return Env::IOPriority::IO_LOW;
+ }
+
+ void SetWriteLifeTimeHint(Env::WriteLifeTimeHint /*hint*/) override {
+ inc(13);
+ }
+
+ Env::WriteLifeTimeHint GetWriteLifeTimeHint() override {
+ inc(14);
+ return Env::WriteLifeTimeHint::WLTH_NOT_SET;
+ }
+
+ uint64_t GetFileSize() override {
+ inc(15);
+ return 0;
+ }
+
+ void SetPreallocationBlockSize(size_t /*size*/) override { inc(16); }
+
+ void GetPreallocationStatus(size_t* /*block_size*/,
+ size_t* /*last_allocated_block*/) override {
+ inc(17);
+ }
+
+ size_t GetUniqueId(char* /*id*/, size_t /*max_size*/) const override {
+ inc(18);
+ return 0;
+ }
+
+ Status InvalidateCache(size_t /*offset*/, size_t /*length*/) override {
+ inc(19);
+ return Status::OK();
+ }
+
+ Status RangeSync(uint64_t /*offset*/, uint64_t /*nbytes*/) override {
+ inc(20);
+ return Status::OK();
+ }
+
+ void PrepareWrite(size_t /*offset*/, size_t /*len*/) override { inc(21); }
+
+ Status Allocate(uint64_t /*offset*/, uint64_t /*len*/) override {
+ inc(22);
+ return Status::OK();
+ }
+
+ public:
+ ~Base() override { inc(23); }
+ };
+
+ class Wrapper : public WritableFileWrapper {
+ public:
+ explicit Wrapper(WritableFile* target) : WritableFileWrapper(target) {}
+ };
+
+ int step = 0;
+
+ {
+ Base b(&step);
+ Wrapper w(&b);
+ w.Append(Slice());
+ w.PositionedAppend(Slice(), 0);
+ w.Truncate(0);
+ w.Close();
+ w.Flush();
+ w.Sync();
+ w.Fsync();
+ w.IsSyncThreadSafe();
+ w.use_direct_io();
+ w.GetRequiredBufferAlignment();
+ w.SetIOPriority(Env::IOPriority::IO_HIGH);
+ w.GetIOPriority();
+ w.SetWriteLifeTimeHint(Env::WriteLifeTimeHint::WLTH_NOT_SET);
+ w.GetWriteLifeTimeHint();
+ w.GetFileSize();
+ w.SetPreallocationBlockSize(0);
+ w.GetPreallocationStatus(nullptr, nullptr);
+ w.GetUniqueId(nullptr, 0);
+ w.InvalidateCache(0, 0);
+ w.RangeSync(0, 0);
+ w.PrepareWrite(0, 0);
+ w.Allocate(0, 0);
+ }
+
+ EXPECT_EQ(24, step);
+}
+
+TEST_P(EnvPosixTestWithParam, PosixRandomRWFile) {
+ const std::string path = test::PerThreadDBPath(env_, "random_rw_file");
+
+ env_->DeleteFile(path);
+
+ std::unique_ptr<RandomRWFile> file;
+
+ // Cannot open non-existing file.
+ ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions()));
+
+ // Create the file using WriteableFile
+ {
+ std::unique_ptr<WritableFile> wf;
+ ASSERT_OK(env_->NewWritableFile(path, &wf, EnvOptions()));
+ }
+
+ ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
+
+ char buf[10000];
+ Slice read_res;
+
+ ASSERT_OK(file->Write(0, "ABCD"));
+ ASSERT_OK(file->Read(0, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ABCD");
+
+ ASSERT_OK(file->Write(2, "XXXX"));
+ ASSERT_OK(file->Read(0, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ABXXXX");
+
+ ASSERT_OK(file->Write(10, "ZZZ"));
+ ASSERT_OK(file->Read(10, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ZZZ");
+
+ ASSERT_OK(file->Write(11, "Y"));
+ ASSERT_OK(file->Read(10, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ZYZ");
+
+ ASSERT_OK(file->Write(200, "FFFFF"));
+ ASSERT_OK(file->Read(200, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "FFFFF");
+
+ ASSERT_OK(file->Write(205, "XXXX"));
+ ASSERT_OK(file->Read(200, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "FFFFFXXXX");
+
+ ASSERT_OK(file->Write(5, "QQQQ"));
+ ASSERT_OK(file->Read(0, 9, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ");
+
+ ASSERT_OK(file->Read(2, 4, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "XXXQ");
+
+ // Close file and reopen it
+ file->Close();
+ ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
+
+ ASSERT_OK(file->Read(0, 9, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ABXXXQQQQ");
+
+ ASSERT_OK(file->Read(10, 3, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ZYZ");
+
+ ASSERT_OK(file->Read(200, 9, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "FFFFFXXXX");
+
+ ASSERT_OK(file->Write(4, "TTTTTTTTTTTTTTTT"));
+ ASSERT_OK(file->Read(0, 10, &read_res, buf));
+ ASSERT_EQ(read_res.ToString(), "ABXXTTTTTT");
+
+ // Clean up
+ env_->DeleteFile(path);
+}
+
+class RandomRWFileWithMirrorString {
+ public:
+ explicit RandomRWFileWithMirrorString(RandomRWFile* _file) : file_(_file) {}
+
+ void Write(size_t offset, const std::string& data) {
+ // Write to mirror string
+ StringWrite(offset, data);
+
+ // Write to file
+ Status s = file_->Write(offset, data);
+ ASSERT_OK(s) << s.ToString();
+ }
+
+ void Read(size_t offset = 0, size_t n = 1000000) {
+ Slice str_res(nullptr, 0);
+ if (offset < file_mirror_.size()) {
+ size_t str_res_sz = std::min(file_mirror_.size() - offset, n);
+ str_res = Slice(file_mirror_.data() + offset, str_res_sz);
+ StopSliceAtNull(&str_res);
+ }
+
+ Slice file_res;
+ Status s = file_->Read(offset, n, &file_res, buf_);
+ ASSERT_OK(s) << s.ToString();
+ StopSliceAtNull(&file_res);
+
+ ASSERT_EQ(str_res.ToString(), file_res.ToString()) << offset << " " << n;
+ }
+
+ void SetFile(RandomRWFile* _file) { file_ = _file; }
+
+ private:
+ void StringWrite(size_t offset, const std::string& src) {
+ if (offset + src.size() > file_mirror_.size()) {
+ file_mirror_.resize(offset + src.size(), '\0');
+ }
+
+ char* pos = const_cast<char*>(file_mirror_.data() + offset);
+ memcpy(pos, src.data(), src.size());
+ }
+
+ void StopSliceAtNull(Slice* slc) {
+ for (size_t i = 0; i < slc->size(); i++) {
+ if ((*slc)[i] == '\0') {
+ *slc = Slice(slc->data(), i);
+ break;
+ }
+ }
+ }
+
+ char buf_[10000];
+ RandomRWFile* file_;
+ std::string file_mirror_;
+};
+
+TEST_P(EnvPosixTestWithParam, PosixRandomRWFileRandomized) {
+ const std::string path = test::PerThreadDBPath(env_, "random_rw_file_rand");
+ env_->DeleteFile(path);
+
+ std::unique_ptr<RandomRWFile> file;
+
+#ifdef OS_LINUX
+ // Cannot open non-existing file.
+ ASSERT_NOK(env_->NewRandomRWFile(path, &file, EnvOptions()));
+#endif
+
+ // Create the file using WriteableFile
+ {
+ std::unique_ptr<WritableFile> wf;
+ ASSERT_OK(env_->NewWritableFile(path, &wf, EnvOptions()));
+ }
+
+ ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
+ RandomRWFileWithMirrorString file_with_mirror(file.get());
+
+ Random rnd(301);
+ std::string buf;
+ for (int i = 0; i < 10000; i++) {
+ // Genrate random data
+ test::RandomString(&rnd, 10, &buf);
+
+ // Pick random offset for write
+ size_t write_off = rnd.Next() % 1000;
+ file_with_mirror.Write(write_off, buf);
+
+ // Pick random offset for read
+ size_t read_off = rnd.Next() % 1000;
+ size_t read_sz = rnd.Next() % 20;
+ file_with_mirror.Read(read_off, read_sz);
+
+ if (i % 500 == 0) {
+ // Reopen the file every 500 iters
+ ASSERT_OK(env_->NewRandomRWFile(path, &file, EnvOptions()));
+ file_with_mirror.SetFile(file.get());
+ }
+ }
+
+ // clean up
+ env_->DeleteFile(path);
+}
+
+class TestEnv : public EnvWrapper {
+ public:
+ explicit TestEnv() : EnvWrapper(Env::Default()),
+ close_count(0) { }
+
+ class TestLogger : public Logger {
+ public:
+ using Logger::Logv;
+ TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
+ ~TestLogger() override {
+ if (!closed_) {
+ CloseHelper();
+ }
+ }
+ void Logv(const char* /*format*/, va_list /*ap*/) override{};
+
+ protected:
+ Status CloseImpl() override { return CloseHelper(); }
+
+ private:
+ Status CloseHelper() {
+ env->CloseCountInc();;
+ return Status::OK();
+ }
+ TestEnv* env;
+ };
+
+ void CloseCountInc() { close_count++; }
+
+ int GetCloseCount() { return close_count; }
+
+ Status NewLogger(const std::string& /*fname*/,
+ std::shared_ptr<Logger>* result) override {
+ result->reset(new TestLogger(this));
+ return Status::OK();
+ }
+
+ private:
+ int close_count;
+};
+
+class EnvTest : public testing::Test {};
+
+TEST_F(EnvTest, Close) {
+ TestEnv* env = new TestEnv();
+ std::shared_ptr<Logger> logger;
+ Status s;
+
+ s = env->NewLogger("", &logger);
+ ASSERT_EQ(s, Status::OK());
+ logger.get()->Close();
+ ASSERT_EQ(env->GetCloseCount(), 1);
+ // Call Close() again. CloseHelper() should not be called again
+ logger.get()->Close();
+ ASSERT_EQ(env->GetCloseCount(), 1);
+ logger.reset();
+ ASSERT_EQ(env->GetCloseCount(), 1);
+
+ s = env->NewLogger("", &logger);
+ ASSERT_EQ(s, Status::OK());
+ logger.reset();
+ ASSERT_EQ(env->GetCloseCount(), 2);
+
+ delete env;
+}
+
+INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO, EnvPosixTestWithParam,
+ ::testing::Values(std::pair<Env*, bool>(Env::Default(),
+ false)));
+#if !defined(ROCKSDB_LITE)
+INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO, EnvPosixTestWithParam,
+ ::testing::Values(std::pair<Env*, bool>(Env::Default(),
+ true)));
+#endif // !defined(ROCKSDB_LITE)
+
+#if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
+static std::unique_ptr<Env> chroot_env(
+ NewChrootEnv(Env::Default(), test::TmpDir(Env::Default())));
+INSTANTIATE_TEST_CASE_P(
+ ChrootEnvWithoutDirectIO, EnvPosixTestWithParam,
+ ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), false)));
+INSTANTIATE_TEST_CASE_P(
+ ChrootEnvWithDirectIO, EnvPosixTestWithParam,
+ ::testing::Values(std::pair<Env*, bool>(chroot_env.get(), true)));
+#endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
+
+} // namespace rocksdb
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/env/io_posix.cc b/src/rocksdb/env/io_posix.cc
new file mode 100644
index 00000000..628ed841
--- /dev/null
+++ b/src/rocksdb/env/io_posix.cc
@@ -0,0 +1,1082 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#ifdef ROCKSDB_LIB_IO_POSIX
+#include "env/io_posix.h"
+#include <errno.h>
+#include <fcntl.h>
+#include <algorithm>
+#if defined(OS_LINUX)
+#include <linux/fs.h>
+#endif
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#ifdef OS_LINUX
+#include <sys/statfs.h>
+#include <sys/syscall.h>
+#include <sys/sysmacros.h>
+#endif
+#include "env/posix_logger.h"
+#include "monitoring/iostats_context_imp.h"
+#include "port/port.h"
+#include "rocksdb/slice.h"
+#include "util/coding.h"
+#include "util/string_util.h"
+#include "util/sync_point.h"
+
+#if defined(OS_LINUX) && !defined(F_SET_RW_HINT)
+#define F_LINUX_SPECIFIC_BASE 1024
+#define F_SET_RW_HINT (F_LINUX_SPECIFIC_BASE + 12)
+#endif
+
+namespace rocksdb {
+
+// A wrapper for fadvise, if the platform doesn't support fadvise,
+// it will simply return 0.
+int Fadvise(int fd, off_t offset, size_t len, int advice) {
+#ifdef OS_LINUX
+ return posix_fadvise(fd, offset, len, advice);
+#else
+ (void)fd;
+ (void)offset;
+ (void)len;
+ (void)advice;
+ return 0; // simply do nothing.
+#endif
+}
+
+namespace {
+size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
+#ifdef OS_LINUX
+ struct stat buf;
+ int result = fstat(fd, &buf);
+ if (result == -1) {
+ return kDefaultPageSize;
+ }
+ if (major(buf.st_dev) == 0) {
+ // Unnamed devices (e.g. non-device mounts), reserved as null device number.
+ // These don't have an entry in /sys/dev/block/. Return a sensible default.
+ return kDefaultPageSize;
+ }
+
+ // Reading queue/logical_block_size does not require special permissions.
+ const int kBufferSize = 100;
+ char path[kBufferSize];
+ char real_path[PATH_MAX + 1];
+ snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
+ minor(buf.st_dev));
+ if (realpath(path, real_path) == nullptr) {
+ return kDefaultPageSize;
+ }
+ std::string device_dir(real_path);
+ if (!device_dir.empty() && device_dir.back() == '/') {
+ device_dir.pop_back();
+ }
+ // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
+ // and nvme0n1 have it.
+ // $ ls -al '/sys/dev/block/8:3'
+ // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
+ // ../../block/sda/sda3
+ // $ ls -al '/sys/dev/block/259:4'
+ // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
+ // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
+ size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
+ if (parent_end == std::string::npos) {
+ return kDefaultPageSize;
+ }
+ size_t parent_begin = device_dir.rfind('/', parent_end - 1);
+ if (parent_begin == std::string::npos) {
+ return kDefaultPageSize;
+ }
+ std::string parent =
+ device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
+ std::string child = device_dir.substr(parent_end + 1, std::string::npos);
+ if (parent != "block" &&
+ (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
+ device_dir = device_dir.substr(0, parent_end);
+ }
+ std::string fname = device_dir + "/queue/logical_block_size";
+ FILE* fp;
+ size_t size = 0;
+ fp = fopen(fname.c_str(), "r");
+ if (fp != nullptr) {
+ char* line = nullptr;
+ size_t len = 0;
+ if (getline(&line, &len, fp) != -1) {
+ sscanf(line, "%zu", &size);
+ }
+ free(line);
+ fclose(fp);
+ }
+ if (size != 0 && (size & (size - 1)) == 0) {
+ return size;
+ }
+#endif
+ return kDefaultPageSize;
+}
+} // namespace
+
+/*
+ * DirectIOHelper
+ */
+#ifndef NDEBUG
+namespace {
+
+bool IsSectorAligned(const size_t off, size_t sector_size) {
+ return off % sector_size == 0;
+}
+
+bool IsSectorAligned(const void* ptr, size_t sector_size) {
+ return uintptr_t(ptr) % sector_size == 0;
+}
+
+}
+#endif
+
+/*
+ * PosixSequentialFile
+ */
+PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
+ int fd, const EnvOptions& options)
+ : filename_(fname),
+ file_(file),
+ fd_(fd),
+ use_direct_io_(options.use_direct_reads),
+ logical_sector_size_(GetLogicalBufferSize(fd_)) {
+ assert(!options.use_direct_reads || !options.use_mmap_reads);
+}
+
+PosixSequentialFile::~PosixSequentialFile() {
+ if (!use_direct_io()) {
+ assert(file_);
+ fclose(file_);
+ } else {
+ assert(fd_);
+ close(fd_);
+ }
+}
+
+Status PosixSequentialFile::Read(size_t n, Slice* result, char* scratch) {
+ assert(result != nullptr && !use_direct_io());
+ Status s;
+ size_t r = 0;
+ do {
+ r = fread_unlocked(scratch, 1, n, file_);
+ } while (r == 0 && ferror(file_) && errno == EINTR);
+ *result = Slice(scratch, r);
+ if (r < n) {
+ if (feof(file_)) {
+ // We leave status as ok if we hit the end of the file
+ // We also clear the error so that the reads can continue
+ // if a new data is written to the file
+ clearerr(file_);
+ } else {
+ // A partial read with an error: return a non-ok status
+ s = IOError("While reading file sequentially", filename_, errno);
+ }
+ }
+ return s;
+}
+
+Status PosixSequentialFile::PositionedRead(uint64_t offset, size_t n,
+ Slice* result, char* scratch) {
+ assert(use_direct_io());
+ assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
+
+ Status s;
+ ssize_t r = -1;
+ size_t left = n;
+ char* ptr = scratch;
+ while (left > 0) {
+ r = pread(fd_, ptr, left, static_cast<off_t>(offset));
+ if (r <= 0) {
+ if (r == -1 && errno == EINTR) {
+ continue;
+ }
+ break;
+ }
+ ptr += r;
+ offset += r;
+ left -= r;
+ if (r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
+ // Bytes reads don't fill sectors. Should only happen at the end
+ // of the file.
+ break;
+ }
+ }
+ if (r < 0) {
+ // An error: return a non-ok status
+ s = IOError(
+ "While pread " + ToString(n) + " bytes from offset " + ToString(offset),
+ filename_, errno);
+ }
+ *result = Slice(scratch, (r < 0) ? 0 : n - left);
+ return s;
+}
+
+Status PosixSequentialFile::Skip(uint64_t n) {
+ if (fseek(file_, static_cast<long int>(n), SEEK_CUR)) {
+ return IOError("While fseek to skip " + ToString(n) + " bytes", filename_,
+ errno);
+ }
+ return Status::OK();
+}
+
+Status PosixSequentialFile::InvalidateCache(size_t offset, size_t length) {
+#ifndef OS_LINUX
+ (void)offset;
+ (void)length;
+ return Status::OK();
+#else
+ if (!use_direct_io()) {
+ // free OS pages
+ int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
+ if (ret != 0) {
+ return IOError("While fadvise NotNeeded offset " + ToString(offset) +
+ " len " + ToString(length),
+ filename_, errno);
+ }
+ }
+ return Status::OK();
+#endif
+}
+
+/*
+ * PosixRandomAccessFile
+ */
+#if defined(OS_LINUX)
+size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
+ if (max_size < kMaxVarint64Length * 3) {
+ return 0;
+ }
+
+ struct stat buf;
+ int result = fstat(fd, &buf);
+ if (result == -1) {
+ return 0;
+ }
+
+ long version = 0;
+ result = ioctl(fd, FS_IOC_GETVERSION, &version);
+ TEST_SYNC_POINT_CALLBACK("GetUniqueIdFromFile:FS_IOC_GETVERSION", &result);
+ if (result == -1) {
+ return 0;
+ }
+ uint64_t uversion = (uint64_t)version;
+
+ char* rid = id;
+ rid = EncodeVarint64(rid, buf.st_dev);
+ rid = EncodeVarint64(rid, buf.st_ino);
+ rid = EncodeVarint64(rid, uversion);
+ assert(rid >= id);
+ return static_cast<size_t>(rid - id);
+}
+#endif
+
+#if defined(OS_MACOSX) || defined(OS_AIX)
+size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
+ if (max_size < kMaxVarint64Length * 3) {
+ return 0;
+ }
+
+ struct stat buf;
+ int result = fstat(fd, &buf);
+ if (result == -1) {
+ return 0;
+ }
+
+ char* rid = id;
+ rid = EncodeVarint64(rid, buf.st_dev);
+ rid = EncodeVarint64(rid, buf.st_ino);
+ rid = EncodeVarint64(rid, buf.st_gen);
+ assert(rid >= id);
+ return static_cast<size_t>(rid - id);
+}
+#endif
+/*
+ * PosixRandomAccessFile
+ *
+ * pread() based random-access
+ */
+PosixRandomAccessFile::PosixRandomAccessFile(const std::string& fname, int fd,
+ const EnvOptions& options)
+ : filename_(fname),
+ fd_(fd),
+ use_direct_io_(options.use_direct_reads),
+ logical_sector_size_(GetLogicalBufferSize(fd_)) {
+ assert(!options.use_direct_reads || !options.use_mmap_reads);
+ assert(!options.use_mmap_reads || sizeof(void*) < 8);
+}
+
+PosixRandomAccessFile::~PosixRandomAccessFile() { close(fd_); }
+
+Status PosixRandomAccessFile::Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const {
+ if (use_direct_io()) {
+ assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(n, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(scratch, GetRequiredBufferAlignment()));
+ }
+ Status s;
+ ssize_t r = -1;
+ size_t left = n;
+ char* ptr = scratch;
+ while (left > 0) {
+ r = pread(fd_, ptr, left, static_cast<off_t>(offset));
+ if (r <= 0) {
+ if (r == -1 && errno == EINTR) {
+ continue;
+ }
+ break;
+ }
+ ptr += r;
+ offset += r;
+ left -= r;
+ if (use_direct_io() &&
+ r % static_cast<ssize_t>(GetRequiredBufferAlignment()) != 0) {
+ // Bytes reads don't fill sectors. Should only happen at the end
+ // of the file.
+ break;
+ }
+ }
+ if (r < 0) {
+ // An error: return a non-ok status
+ s = IOError(
+ "While pread offset " + ToString(offset) + " len " + ToString(n),
+ filename_, errno);
+ }
+ *result = Slice(scratch, (r < 0) ? 0 : n - left);
+ return s;
+}
+
+Status PosixRandomAccessFile::Prefetch(uint64_t offset, size_t n) {
+ Status s;
+ if (!use_direct_io()) {
+ ssize_t r = 0;
+#ifdef OS_LINUX
+ r = readahead(fd_, offset, n);
+#endif
+#ifdef OS_MACOSX
+ radvisory advice;
+ advice.ra_offset = static_cast<off_t>(offset);
+ advice.ra_count = static_cast<int>(n);
+ r = fcntl(fd_, F_RDADVISE, &advice);
+#endif
+ if (r == -1) {
+ s = IOError("While prefetching offset " + ToString(offset) + " len " +
+ ToString(n),
+ filename_, errno);
+ }
+ }
+ return s;
+}
+
+#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
+size_t PosixRandomAccessFile::GetUniqueId(char* id, size_t max_size) const {
+ return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
+}
+#endif
+
+void PosixRandomAccessFile::Hint(AccessPattern pattern) {
+ if (use_direct_io()) {
+ return;
+ }
+ switch (pattern) {
+ case NORMAL:
+ Fadvise(fd_, 0, 0, POSIX_FADV_NORMAL);
+ break;
+ case RANDOM:
+ Fadvise(fd_, 0, 0, POSIX_FADV_RANDOM);
+ break;
+ case SEQUENTIAL:
+ Fadvise(fd_, 0, 0, POSIX_FADV_SEQUENTIAL);
+ break;
+ case WILLNEED:
+ Fadvise(fd_, 0, 0, POSIX_FADV_WILLNEED);
+ break;
+ case DONTNEED:
+ Fadvise(fd_, 0, 0, POSIX_FADV_DONTNEED);
+ break;
+ default:
+ assert(false);
+ break;
+ }
+}
+
+Status PosixRandomAccessFile::InvalidateCache(size_t offset, size_t length) {
+ if (use_direct_io()) {
+ return Status::OK();
+ }
+#ifndef OS_LINUX
+ (void)offset;
+ (void)length;
+ return Status::OK();
+#else
+ // free OS pages
+ int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
+ if (ret == 0) {
+ return Status::OK();
+ }
+ return IOError("While fadvise NotNeeded offset " + ToString(offset) +
+ " len " + ToString(length),
+ filename_, errno);
+#endif
+}
+
+/*
+ * PosixMmapReadableFile
+ *
+ * mmap() based random-access
+ */
+// base[0,length-1] contains the mmapped contents of the file.
+PosixMmapReadableFile::PosixMmapReadableFile(const int fd,
+ const std::string& fname,
+ void* base, size_t length,
+ const EnvOptions& options)
+ : fd_(fd), filename_(fname), mmapped_region_(base), length_(length) {
+#ifdef NDEBUG
+ (void)options;
+#endif
+ fd_ = fd_ + 0; // suppress the warning for used variables
+ assert(options.use_mmap_reads);
+ assert(!options.use_direct_reads);
+}
+
+PosixMmapReadableFile::~PosixMmapReadableFile() {
+ int ret = munmap(mmapped_region_, length_);
+ if (ret != 0) {
+ fprintf(stdout, "failed to munmap %p length %" ROCKSDB_PRIszt " \n",
+ mmapped_region_, length_);
+ }
+ close(fd_);
+}
+
+Status PosixMmapReadableFile::Read(uint64_t offset, size_t n, Slice* result,
+ char* /*scratch*/) const {
+ Status s;
+ if (offset > length_) {
+ *result = Slice();
+ return IOError("While mmap read offset " + ToString(offset) +
+ " larger than file length " + ToString(length_),
+ filename_, EINVAL);
+ } else if (offset + n > length_) {
+ n = static_cast<size_t>(length_ - offset);
+ }
+ *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
+ return s;
+}
+
+Status PosixMmapReadableFile::InvalidateCache(size_t offset, size_t length) {
+#ifndef OS_LINUX
+ (void)offset;
+ (void)length;
+ return Status::OK();
+#else
+ // free OS pages
+ int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
+ if (ret == 0) {
+ return Status::OK();
+ }
+ return IOError("While fadvise not needed. Offset " + ToString(offset) +
+ " len" + ToString(length),
+ filename_, errno);
+#endif
+}
+
+/*
+ * PosixMmapFile
+ *
+ * We preallocate up to an extra megabyte and use memcpy to append new
+ * data to the file. This is safe since we either properly close the
+ * file before reading from it, or for log files, the reading code
+ * knows enough to skip zero suffixes.
+ */
+Status PosixMmapFile::UnmapCurrentRegion() {
+ TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
+ if (base_ != nullptr) {
+ int munmap_status = munmap(base_, limit_ - base_);
+ if (munmap_status != 0) {
+ return IOError("While munmap", filename_, munmap_status);
+ }
+ file_offset_ += limit_ - base_;
+ base_ = nullptr;
+ limit_ = nullptr;
+ last_sync_ = nullptr;
+ dst_ = nullptr;
+
+ // Increase the amount we map the next time, but capped at 1MB
+ if (map_size_ < (1 << 20)) {
+ map_size_ *= 2;
+ }
+ }
+ return Status::OK();
+}
+
+Status PosixMmapFile::MapNewRegion() {
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ assert(base_ == nullptr);
+ TEST_KILL_RANDOM("PosixMmapFile::UnmapCurrentRegion:0", rocksdb_kill_odds);
+ // we can't fallocate with FALLOC_FL_KEEP_SIZE here
+ if (allow_fallocate_) {
+ IOSTATS_TIMER_GUARD(allocate_nanos);
+ int alloc_status = fallocate(fd_, 0, file_offset_, map_size_);
+ if (alloc_status != 0) {
+ // fallback to posix_fallocate
+ alloc_status = posix_fallocate(fd_, file_offset_, map_size_);
+ }
+ if (alloc_status != 0) {
+ return Status::IOError("Error allocating space to file : " + filename_ +
+ "Error : " + strerror(alloc_status));
+ }
+ }
+
+ TEST_KILL_RANDOM("PosixMmapFile::Append:1", rocksdb_kill_odds);
+ void* ptr = mmap(nullptr, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED, fd_,
+ file_offset_);
+ if (ptr == MAP_FAILED) {
+ return Status::IOError("MMap failed on " + filename_);
+ }
+ TEST_KILL_RANDOM("PosixMmapFile::Append:2", rocksdb_kill_odds);
+
+ base_ = reinterpret_cast<char*>(ptr);
+ limit_ = base_ + map_size_;
+ dst_ = base_;
+ last_sync_ = base_;
+ return Status::OK();
+#else
+ return Status::NotSupported("This platform doesn't support fallocate()");
+#endif
+}
+
+Status PosixMmapFile::Msync() {
+ if (dst_ == last_sync_) {
+ return Status::OK();
+ }
+ // Find the beginnings of the pages that contain the first and last
+ // bytes to be synced.
+ size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
+ size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
+ last_sync_ = dst_;
+ TEST_KILL_RANDOM("PosixMmapFile::Msync:0", rocksdb_kill_odds);
+ if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
+ return IOError("While msync", filename_, errno);
+ }
+ return Status::OK();
+}
+
+PosixMmapFile::PosixMmapFile(const std::string& fname, int fd, size_t page_size,
+ const EnvOptions& options)
+ : filename_(fname),
+ fd_(fd),
+ page_size_(page_size),
+ map_size_(Roundup(65536, page_size)),
+ base_(nullptr),
+ limit_(nullptr),
+ dst_(nullptr),
+ last_sync_(nullptr),
+ file_offset_(0) {
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ allow_fallocate_ = options.allow_fallocate;
+ fallocate_with_keep_size_ = options.fallocate_with_keep_size;
+#else
+ (void)options;
+#endif
+ assert((page_size & (page_size - 1)) == 0);
+ assert(options.use_mmap_writes);
+ assert(!options.use_direct_writes);
+}
+
+PosixMmapFile::~PosixMmapFile() {
+ if (fd_ >= 0) {
+ PosixMmapFile::Close();
+ }
+}
+
+Status PosixMmapFile::Append(const Slice& data) {
+ const char* src = data.data();
+ size_t left = data.size();
+ while (left > 0) {
+ assert(base_ <= dst_);
+ assert(dst_ <= limit_);
+ size_t avail = limit_ - dst_;
+ if (avail == 0) {
+ Status s = UnmapCurrentRegion();
+ if (!s.ok()) {
+ return s;
+ }
+ s = MapNewRegion();
+ if (!s.ok()) {
+ return s;
+ }
+ TEST_KILL_RANDOM("PosixMmapFile::Append:0", rocksdb_kill_odds);
+ }
+
+ size_t n = (left <= avail) ? left : avail;
+ assert(dst_);
+ memcpy(dst_, src, n);
+ dst_ += n;
+ src += n;
+ left -= n;
+ }
+ return Status::OK();
+}
+
+Status PosixMmapFile::Close() {
+ Status s;
+ size_t unused = limit_ - dst_;
+
+ s = UnmapCurrentRegion();
+ if (!s.ok()) {
+ s = IOError("While closing mmapped file", filename_, errno);
+ } else if (unused > 0) {
+ // Trim the extra space at the end of the file
+ if (ftruncate(fd_, file_offset_ - unused) < 0) {
+ s = IOError("While ftruncating mmaped file", filename_, errno);
+ }
+ }
+
+ if (close(fd_) < 0) {
+ if (s.ok()) {
+ s = IOError("While closing mmapped file", filename_, errno);
+ }
+ }
+
+ fd_ = -1;
+ base_ = nullptr;
+ limit_ = nullptr;
+ return s;
+}
+
+Status PosixMmapFile::Flush() { return Status::OK(); }
+
+Status PosixMmapFile::Sync() {
+ if (fdatasync(fd_) < 0) {
+ return IOError("While fdatasync mmapped file", filename_, errno);
+ }
+
+ return Msync();
+}
+
+/**
+ * Flush data as well as metadata to stable storage.
+ */
+Status PosixMmapFile::Fsync() {
+ if (fsync(fd_) < 0) {
+ return IOError("While fsync mmaped file", filename_, errno);
+ }
+
+ return Msync();
+}
+
+/**
+ * Get the size of valid data in the file. This will not match the
+ * size that is returned from the filesystem because we use mmap
+ * to extend file by map_size every time.
+ */
+uint64_t PosixMmapFile::GetFileSize() {
+ size_t used = dst_ - base_;
+ return file_offset_ + used;
+}
+
+Status PosixMmapFile::InvalidateCache(size_t offset, size_t length) {
+#ifndef OS_LINUX
+ (void)offset;
+ (void)length;
+ return Status::OK();
+#else
+ // free OS pages
+ int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
+ if (ret == 0) {
+ return Status::OK();
+ }
+ return IOError("While fadvise NotNeeded mmapped file", filename_, errno);
+#endif
+}
+
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+Status PosixMmapFile::Allocate(uint64_t offset, uint64_t len) {
+ assert(offset <= std::numeric_limits<off_t>::max());
+ assert(len <= std::numeric_limits<off_t>::max());
+ TEST_KILL_RANDOM("PosixMmapFile::Allocate:0", rocksdb_kill_odds);
+ int alloc_status = 0;
+ if (allow_fallocate_) {
+ alloc_status = fallocate(
+ fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
+ static_cast<off_t>(offset), static_cast<off_t>(len));
+ }
+ if (alloc_status == 0) {
+ return Status::OK();
+ } else {
+ return IOError(
+ "While fallocate offset " + ToString(offset) + " len " + ToString(len),
+ filename_, errno);
+ }
+}
+#endif
+
+/*
+ * PosixWritableFile
+ *
+ * Use posix write to write data to a file.
+ */
+PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
+ const EnvOptions& options)
+ : filename_(fname),
+ use_direct_io_(options.use_direct_writes),
+ fd_(fd),
+ filesize_(0),
+ logical_sector_size_(GetLogicalBufferSize(fd_)) {
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ allow_fallocate_ = options.allow_fallocate;
+ fallocate_with_keep_size_ = options.fallocate_with_keep_size;
+#endif
+ assert(!options.use_mmap_writes);
+}
+
+PosixWritableFile::~PosixWritableFile() {
+ if (fd_ >= 0) {
+ PosixWritableFile::Close();
+ }
+}
+
+Status PosixWritableFile::Append(const Slice& data) {
+ if (use_direct_io()) {
+ assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
+ }
+ const char* src = data.data();
+ size_t left = data.size();
+ while (left != 0) {
+ ssize_t done = write(fd_, src, left);
+ if (done < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return IOError("While appending to file", filename_, errno);
+ }
+ left -= done;
+ src += done;
+ }
+ filesize_ += data.size();
+ return Status::OK();
+}
+
+Status PosixWritableFile::PositionedAppend(const Slice& data, uint64_t offset) {
+ if (use_direct_io()) {
+ assert(IsSectorAligned(offset, GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(data.size(), GetRequiredBufferAlignment()));
+ assert(IsSectorAligned(data.data(), GetRequiredBufferAlignment()));
+ }
+ assert(offset <= std::numeric_limits<off_t>::max());
+ const char* src = data.data();
+ size_t left = data.size();
+ while (left != 0) {
+ ssize_t done = pwrite(fd_, src, left, static_cast<off_t>(offset));
+ if (done < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ return IOError("While pwrite to file at offset " + ToString(offset),
+ filename_, errno);
+ }
+ left -= done;
+ offset += done;
+ src += done;
+ }
+ filesize_ = offset;
+ return Status::OK();
+}
+
+Status PosixWritableFile::Truncate(uint64_t size) {
+ Status s;
+ int r = ftruncate(fd_, size);
+ if (r < 0) {
+ s = IOError("While ftruncate file to size " + ToString(size), filename_,
+ errno);
+ } else {
+ filesize_ = size;
+ }
+ return s;
+}
+
+Status PosixWritableFile::Close() {
+ Status s;
+
+ size_t block_size;
+ size_t last_allocated_block;
+ GetPreallocationStatus(&block_size, &last_allocated_block);
+ if (last_allocated_block > 0) {
+ // trim the extra space preallocated at the end of the file
+ // NOTE(ljin): we probably don't want to surface failure as an IOError,
+ // but it will be nice to log these errors.
+ int dummy __attribute__((__unused__));
+ dummy = ftruncate(fd_, filesize_);
+#if defined(ROCKSDB_FALLOCATE_PRESENT) && defined(FALLOC_FL_PUNCH_HOLE) && \
+ !defined(TRAVIS)
+ // in some file systems, ftruncate only trims trailing space if the
+ // new file size is smaller than the current size. Calling fallocate
+ // with FALLOC_FL_PUNCH_HOLE flag to explicitly release these unused
+ // blocks. FALLOC_FL_PUNCH_HOLE is supported on at least the following
+ // filesystems:
+ // XFS (since Linux 2.6.38)
+ // ext4 (since Linux 3.0)
+ // Btrfs (since Linux 3.7)
+ // tmpfs (since Linux 3.5)
+ // We ignore error since failure of this operation does not affect
+ // correctness.
+ // TRAVIS - this code does not work on TRAVIS filesystems.
+ // the FALLOC_FL_KEEP_SIZE option is expected to not change the size
+ // of the file, but it does. Simple strace report will show that.
+ // While we work with Travis-CI team to figure out if this is a
+ // quirk of Docker/AUFS, we will comment this out.
+ struct stat file_stats;
+ int result = fstat(fd_, &file_stats);
+ // After ftruncate, we check whether ftruncate has the correct behavior.
+ // If not, we should hack it with FALLOC_FL_PUNCH_HOLE
+ if (result == 0 &&
+ (file_stats.st_size + file_stats.st_blksize - 1) /
+ file_stats.st_blksize !=
+ file_stats.st_blocks / (file_stats.st_blksize / 512)) {
+ IOSTATS_TIMER_GUARD(allocate_nanos);
+ if (allow_fallocate_) {
+ fallocate(fd_, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, filesize_,
+ block_size * last_allocated_block - filesize_);
+ }
+ }
+#endif
+ }
+
+ if (close(fd_) < 0) {
+ s = IOError("While closing file after writing", filename_, errno);
+ }
+ fd_ = -1;
+ return s;
+}
+
+// write out the cached data to the OS cache
+Status PosixWritableFile::Flush() { return Status::OK(); }
+
+Status PosixWritableFile::Sync() {
+ if (fdatasync(fd_) < 0) {
+ return IOError("While fdatasync", filename_, errno);
+ }
+ return Status::OK();
+}
+
+Status PosixWritableFile::Fsync() {
+ if (fsync(fd_) < 0) {
+ return IOError("While fsync", filename_, errno);
+ }
+ return Status::OK();
+}
+
+bool PosixWritableFile::IsSyncThreadSafe() const { return true; }
+
+uint64_t PosixWritableFile::GetFileSize() { return filesize_; }
+
+void PosixWritableFile::SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) {
+#ifdef OS_LINUX
+// Suppress Valgrind "Unimplemented functionality" error.
+#ifndef ROCKSDB_VALGRIND_RUN
+ if (hint == write_hint_) {
+ return;
+ }
+ if (fcntl(fd_, F_SET_RW_HINT, &hint) == 0) {
+ write_hint_ = hint;
+ }
+#else
+ (void)hint;
+#endif // ROCKSDB_VALGRIND_RUN
+#else
+ (void)hint;
+#endif // OS_LINUX
+}
+
+Status PosixWritableFile::InvalidateCache(size_t offset, size_t length) {
+ if (use_direct_io()) {
+ return Status::OK();
+ }
+#ifndef OS_LINUX
+ (void)offset;
+ (void)length;
+ return Status::OK();
+#else
+ // free OS pages
+ int ret = Fadvise(fd_, offset, length, POSIX_FADV_DONTNEED);
+ if (ret == 0) {
+ return Status::OK();
+ }
+ return IOError("While fadvise NotNeeded", filename_, errno);
+#endif
+}
+
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+Status PosixWritableFile::Allocate(uint64_t offset, uint64_t len) {
+ assert(offset <= std::numeric_limits<off_t>::max());
+ assert(len <= std::numeric_limits<off_t>::max());
+ TEST_KILL_RANDOM("PosixWritableFile::Allocate:0", rocksdb_kill_odds);
+ IOSTATS_TIMER_GUARD(allocate_nanos);
+ int alloc_status = 0;
+ if (allow_fallocate_) {
+ alloc_status = fallocate(
+ fd_, fallocate_with_keep_size_ ? FALLOC_FL_KEEP_SIZE : 0,
+ static_cast<off_t>(offset), static_cast<off_t>(len));
+ }
+ if (alloc_status == 0) {
+ return Status::OK();
+ } else {
+ return IOError(
+ "While fallocate offset " + ToString(offset) + " len " + ToString(len),
+ filename_, errno);
+ }
+}
+#endif
+
+#ifdef ROCKSDB_RANGESYNC_PRESENT
+Status PosixWritableFile::RangeSync(uint64_t offset, uint64_t nbytes) {
+ assert(offset <= std::numeric_limits<off_t>::max());
+ assert(nbytes <= std::numeric_limits<off_t>::max());
+ if (sync_file_range(fd_, static_cast<off_t>(offset),
+ static_cast<off_t>(nbytes), SYNC_FILE_RANGE_WRITE) == 0) {
+ return Status::OK();
+ } else {
+ return IOError("While sync_file_range offset " + ToString(offset) +
+ " bytes " + ToString(nbytes),
+ filename_, errno);
+ }
+}
+#endif
+
+#ifdef OS_LINUX
+size_t PosixWritableFile::GetUniqueId(char* id, size_t max_size) const {
+ return PosixHelper::GetUniqueIdFromFile(fd_, id, max_size);
+}
+#endif
+
+/*
+ * PosixRandomRWFile
+ */
+
+PosixRandomRWFile::PosixRandomRWFile(const std::string& fname, int fd,
+ const EnvOptions& /*options*/)
+ : filename_(fname), fd_(fd) {}
+
+PosixRandomRWFile::~PosixRandomRWFile() {
+ if (fd_ >= 0) {
+ Close();
+ }
+}
+
+Status PosixRandomRWFile::Write(uint64_t offset, const Slice& data) {
+ const char* src = data.data();
+ size_t left = data.size();
+ while (left != 0) {
+ ssize_t done = pwrite(fd_, src, left, offset);
+ if (done < 0) {
+ // error while writing to file
+ if (errno == EINTR) {
+ // write was interrupted, try again.
+ continue;
+ }
+ return IOError(
+ "While write random read/write file at offset " + ToString(offset),
+ filename_, errno);
+ }
+
+ // Wrote `done` bytes
+ left -= done;
+ offset += done;
+ src += done;
+ }
+
+ return Status::OK();
+}
+
+Status PosixRandomRWFile::Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const {
+ size_t left = n;
+ char* ptr = scratch;
+ while (left > 0) {
+ ssize_t done = pread(fd_, ptr, left, offset);
+ if (done < 0) {
+ // error while reading from file
+ if (errno == EINTR) {
+ // read was interrupted, try again.
+ continue;
+ }
+ return IOError("While reading random read/write file offset " +
+ ToString(offset) + " len " + ToString(n),
+ filename_, errno);
+ } else if (done == 0) {
+ // Nothing more to read
+ break;
+ }
+
+ // Read `done` bytes
+ ptr += done;
+ offset += done;
+ left -= done;
+ }
+
+ *result = Slice(scratch, n - left);
+ return Status::OK();
+}
+
+Status PosixRandomRWFile::Flush() { return Status::OK(); }
+
+Status PosixRandomRWFile::Sync() {
+ if (fdatasync(fd_) < 0) {
+ return IOError("While fdatasync random read/write file", filename_, errno);
+ }
+ return Status::OK();
+}
+
+Status PosixRandomRWFile::Fsync() {
+ if (fsync(fd_) < 0) {
+ return IOError("While fsync random read/write file", filename_, errno);
+ }
+ return Status::OK();
+}
+
+Status PosixRandomRWFile::Close() {
+ if (close(fd_) < 0) {
+ return IOError("While close random read/write file", filename_, errno);
+ }
+ fd_ = -1;
+ return Status::OK();
+}
+
+PosixMemoryMappedFileBuffer::~PosixMemoryMappedFileBuffer() {
+ // TODO should have error handling though not much we can do...
+ munmap(this->base_, length_);
+}
+
+/*
+ * PosixDirectory
+ */
+
+PosixDirectory::~PosixDirectory() { close(fd_); }
+
+Status PosixDirectory::Fsync() {
+#ifndef OS_AIX
+ if (fsync(fd_) == -1) {
+ return IOError("While fsync", "a directory", errno);
+ }
+#endif
+ return Status::OK();
+}
+} // namespace rocksdb
+#endif
diff --git a/src/rocksdb/env/io_posix.h b/src/rocksdb/env/io_posix.h
new file mode 100644
index 00000000..e6824d3e
--- /dev/null
+++ b/src/rocksdb/env/io_posix.h
@@ -0,0 +1,258 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+#include <errno.h>
+#include <unistd.h>
+#include <atomic>
+#include <string>
+#include "rocksdb/env.h"
+
+// For non linux platform, the following macros are used only as place
+// holder.
+#if !(defined OS_LINUX) && !(defined CYGWIN) && !(defined OS_AIX)
+#define POSIX_FADV_NORMAL 0 /* [MC1] no further special treatment */
+#define POSIX_FADV_RANDOM 1 /* [MC1] expect random page refs */
+#define POSIX_FADV_SEQUENTIAL 2 /* [MC1] expect sequential page refs */
+#define POSIX_FADV_WILLNEED 3 /* [MC1] will need these pages */
+#define POSIX_FADV_DONTNEED 4 /* [MC1] dont need these pages */
+#endif
+
+namespace rocksdb {
+static std::string IOErrorMsg(const std::string& context,
+ const std::string& file_name) {
+ if (file_name.empty()) {
+ return context;
+ }
+ return context + ": " + file_name;
+}
+
+// file_name can be left empty if it is not unkown.
+static Status IOError(const std::string& context, const std::string& file_name,
+ int err_number) {
+ switch (err_number) {
+ case ENOSPC:
+ return Status::NoSpace(IOErrorMsg(context, file_name),
+ strerror(err_number));
+ case ESTALE:
+ return Status::IOError(Status::kStaleFile);
+ case ENOENT:
+ return Status::PathNotFound(IOErrorMsg(context, file_name),
+ strerror(err_number));
+ default:
+ return Status::IOError(IOErrorMsg(context, file_name),
+ strerror(err_number));
+ }
+}
+
+class PosixHelper {
+ public:
+ static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size);
+};
+
+class PosixSequentialFile : public SequentialFile {
+ private:
+ std::string filename_;
+ FILE* file_;
+ int fd_;
+ bool use_direct_io_;
+ size_t logical_sector_size_;
+
+ public:
+ PosixSequentialFile(const std::string& fname, FILE* file, int fd,
+ const EnvOptions& options);
+ virtual ~PosixSequentialFile();
+
+ virtual Status Read(size_t n, Slice* result, char* scratch) override;
+ virtual Status PositionedRead(uint64_t offset, size_t n, Slice* result,
+ char* scratch) override;
+ virtual Status Skip(uint64_t n) override;
+ virtual Status InvalidateCache(size_t offset, size_t length) override;
+ virtual bool use_direct_io() const override { return use_direct_io_; }
+ virtual size_t GetRequiredBufferAlignment() const override {
+ return logical_sector_size_;
+ }
+};
+
+class PosixRandomAccessFile : public RandomAccessFile {
+ protected:
+ std::string filename_;
+ int fd_;
+ bool use_direct_io_;
+ size_t logical_sector_size_;
+
+ public:
+ PosixRandomAccessFile(const std::string& fname, int fd,
+ const EnvOptions& options);
+ virtual ~PosixRandomAccessFile();
+
+ virtual Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override;
+
+ virtual Status Prefetch(uint64_t offset, size_t n) override;
+
+#if defined(OS_LINUX) || defined(OS_MACOSX) || defined(OS_AIX)
+ virtual size_t GetUniqueId(char* id, size_t max_size) const override;
+#endif
+ virtual void Hint(AccessPattern pattern) override;
+ virtual Status InvalidateCache(size_t offset, size_t length) override;
+ virtual bool use_direct_io() const override { return use_direct_io_; }
+ virtual size_t GetRequiredBufferAlignment() const override {
+ return logical_sector_size_;
+ }
+};
+
+class PosixWritableFile : public WritableFile {
+ protected:
+ const std::string filename_;
+ const bool use_direct_io_;
+ int fd_;
+ uint64_t filesize_;
+ size_t logical_sector_size_;
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ bool allow_fallocate_;
+ bool fallocate_with_keep_size_;
+#endif
+
+ public:
+ explicit PosixWritableFile(const std::string& fname, int fd,
+ const EnvOptions& options);
+ virtual ~PosixWritableFile();
+
+ // Need to implement this so the file is truncated correctly
+ // with direct I/O
+ virtual Status Truncate(uint64_t size) override;
+ virtual Status Close() override;
+ virtual Status Append(const Slice& data) override;
+ virtual Status PositionedAppend(const Slice& data, uint64_t offset) override;
+ virtual Status Flush() override;
+ virtual Status Sync() override;
+ virtual Status Fsync() override;
+ virtual bool IsSyncThreadSafe() const override;
+ virtual bool use_direct_io() const override { return use_direct_io_; }
+ virtual void SetWriteLifeTimeHint(Env::WriteLifeTimeHint hint) override;
+ virtual uint64_t GetFileSize() override;
+ virtual Status InvalidateCache(size_t offset, size_t length) override;
+ virtual size_t GetRequiredBufferAlignment() const override {
+ return logical_sector_size_;
+ }
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ virtual Status Allocate(uint64_t offset, uint64_t len) override;
+#endif
+#ifdef ROCKSDB_RANGESYNC_PRESENT
+ virtual Status RangeSync(uint64_t offset, uint64_t nbytes) override;
+#endif
+#ifdef OS_LINUX
+ virtual size_t GetUniqueId(char* id, size_t max_size) const override;
+#endif
+};
+
+// mmap() based random-access
+class PosixMmapReadableFile : public RandomAccessFile {
+ private:
+ int fd_;
+ std::string filename_;
+ void* mmapped_region_;
+ size_t length_;
+
+ public:
+ PosixMmapReadableFile(const int fd, const std::string& fname, void* base,
+ size_t length, const EnvOptions& options);
+ virtual ~PosixMmapReadableFile();
+ virtual Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override;
+ virtual Status InvalidateCache(size_t offset, size_t length) override;
+};
+
+class PosixMmapFile : public WritableFile {
+ private:
+ std::string filename_;
+ int fd_;
+ size_t page_size_;
+ size_t map_size_; // How much extra memory to map at a time
+ char* base_; // The mapped region
+ char* limit_; // Limit of the mapped region
+ char* dst_; // Where to write next (in range [base_,limit_])
+ char* last_sync_; // Where have we synced up to
+ uint64_t file_offset_; // Offset of base_ in file
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ bool allow_fallocate_; // If false, fallocate calls are bypassed
+ bool fallocate_with_keep_size_;
+#endif
+
+ // Roundup x to a multiple of y
+ static size_t Roundup(size_t x, size_t y) { return ((x + y - 1) / y) * y; }
+
+ size_t TruncateToPageBoundary(size_t s) {
+ s -= (s & (page_size_ - 1));
+ assert((s % page_size_) == 0);
+ return s;
+ }
+
+ Status MapNewRegion();
+ Status UnmapCurrentRegion();
+ Status Msync();
+
+ public:
+ PosixMmapFile(const std::string& fname, int fd, size_t page_size,
+ const EnvOptions& options);
+ ~PosixMmapFile();
+
+ // Means Close() will properly take care of truncate
+ // and it does not need any additional information
+ virtual Status Truncate(uint64_t /*size*/) override { return Status::OK(); }
+ virtual Status Close() override;
+ virtual Status Append(const Slice& data) override;
+ virtual Status Flush() override;
+ virtual Status Sync() override;
+ virtual Status Fsync() override;
+ virtual uint64_t GetFileSize() override;
+ virtual Status InvalidateCache(size_t offset, size_t length) override;
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ virtual Status Allocate(uint64_t offset, uint64_t len) override;
+#endif
+};
+
+class PosixRandomRWFile : public RandomRWFile {
+ public:
+ explicit PosixRandomRWFile(const std::string& fname, int fd,
+ const EnvOptions& options);
+ virtual ~PosixRandomRWFile();
+
+ virtual Status Write(uint64_t offset, const Slice& data) override;
+
+ virtual Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override;
+
+ virtual Status Flush() override;
+ virtual Status Sync() override;
+ virtual Status Fsync() override;
+ virtual Status Close() override;
+
+ private:
+ const std::string filename_;
+ int fd_;
+};
+
+struct PosixMemoryMappedFileBuffer : public MemoryMappedFileBuffer {
+ PosixMemoryMappedFileBuffer(void* _base, size_t _length)
+ : MemoryMappedFileBuffer(_base, _length) {}
+ virtual ~PosixMemoryMappedFileBuffer();
+};
+
+class PosixDirectory : public Directory {
+ public:
+ explicit PosixDirectory(int fd) : fd_(fd) {}
+ ~PosixDirectory();
+ virtual Status Fsync() override;
+
+ private:
+ int fd_;
+};
+
+} // namespace rocksdb
diff --git a/src/rocksdb/env/mock_env.cc b/src/rocksdb/env/mock_env.cc
new file mode 100644
index 00000000..793a0837
--- /dev/null
+++ b/src/rocksdb/env/mock_env.cc
@@ -0,0 +1,775 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "env/mock_env.h"
+#include <algorithm>
+#include <chrono>
+#include "port/sys_time.h"
+#include "util/cast_util.h"
+#include "util/murmurhash.h"
+#include "util/random.h"
+#include "util/rate_limiter.h"
+
+namespace rocksdb {
+
+class MemFile {
+ public:
+ explicit MemFile(Env* env, const std::string& fn, bool _is_lock_file = false)
+ : env_(env),
+ fn_(fn),
+ refs_(0),
+ is_lock_file_(_is_lock_file),
+ locked_(false),
+ size_(0),
+ modified_time_(Now()),
+ rnd_(static_cast<uint32_t>(
+ MurmurHash(fn.data(), static_cast<int>(fn.size()), 0))),
+ fsynced_bytes_(0) {}
+
+ void Ref() {
+ MutexLock lock(&mutex_);
+ ++refs_;
+ }
+
+ bool is_lock_file() const { return is_lock_file_; }
+
+ bool Lock() {
+ assert(is_lock_file_);
+ MutexLock lock(&mutex_);
+ if (locked_) {
+ return false;
+ } else {
+ locked_ = true;
+ return true;
+ }
+ }
+
+ void Unlock() {
+ assert(is_lock_file_);
+ MutexLock lock(&mutex_);
+ locked_ = false;
+ }
+
+ void Unref() {
+ bool do_delete = false;
+ {
+ MutexLock lock(&mutex_);
+ --refs_;
+ assert(refs_ >= 0);
+ if (refs_ <= 0) {
+ do_delete = true;
+ }
+ }
+
+ if (do_delete) {
+ delete this;
+ }
+ }
+
+ uint64_t Size() const { return size_; }
+
+ void Truncate(size_t size) {
+ MutexLock lock(&mutex_);
+ if (size < size_) {
+ data_.resize(size);
+ size_ = size;
+ }
+ }
+
+ void CorruptBuffer() {
+ if (fsynced_bytes_ >= size_) {
+ return;
+ }
+ uint64_t buffered_bytes = size_ - fsynced_bytes_;
+ uint64_t start =
+ fsynced_bytes_ + rnd_.Uniform(static_cast<int>(buffered_bytes));
+ uint64_t end = std::min(start + 512, size_.load());
+ MutexLock lock(&mutex_);
+ for (uint64_t pos = start; pos < end; ++pos) {
+ data_[static_cast<size_t>(pos)] = static_cast<char>(rnd_.Uniform(256));
+ }
+ }
+
+ Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const {
+ MutexLock lock(&mutex_);
+ const uint64_t available = Size() - std::min(Size(), offset);
+ size_t offset_ = static_cast<size_t>(offset);
+ if (n > available) {
+ n = static_cast<size_t>(available);
+ }
+ if (n == 0) {
+ *result = Slice();
+ return Status::OK();
+ }
+ if (scratch) {
+ memcpy(scratch, &(data_[offset_]), n);
+ *result = Slice(scratch, n);
+ } else {
+ *result = Slice(&(data_[offset_]), n);
+ }
+ return Status::OK();
+ }
+
+ Status Write(uint64_t offset, const Slice& data) {
+ MutexLock lock(&mutex_);
+ size_t offset_ = static_cast<size_t>(offset);
+ if (offset + data.size() > data_.size()) {
+ data_.resize(offset_ + data.size());
+ }
+ data_.replace(offset_, data.size(), data.data(), data.size());
+ size_ = data_.size();
+ modified_time_ = Now();
+ return Status::OK();
+ }
+
+ Status Append(const Slice& data) {
+ MutexLock lock(&mutex_);
+ data_.append(data.data(), data.size());
+ size_ = data_.size();
+ modified_time_ = Now();
+ return Status::OK();
+ }
+
+ Status Fsync() {
+ fsynced_bytes_ = size_.load();
+ return Status::OK();
+ }
+
+ uint64_t ModifiedTime() const { return modified_time_; }
+
+ private:
+ uint64_t Now() {
+ int64_t unix_time = 0;
+ auto s = env_->GetCurrentTime(&unix_time);
+ assert(s.ok());
+ return static_cast<uint64_t>(unix_time);
+ }
+
+ // Private since only Unref() should be used to delete it.
+ ~MemFile() { assert(refs_ == 0); }
+
+ // No copying allowed.
+ MemFile(const MemFile&);
+ void operator=(const MemFile&);
+
+ Env* env_;
+ const std::string fn_;
+ mutable port::Mutex mutex_;
+ int refs_;
+ bool is_lock_file_;
+ bool locked_;
+
+ // Data written into this file, all bytes before fsynced_bytes are
+ // persistent.
+ std::string data_;
+ std::atomic<uint64_t> size_;
+ std::atomic<uint64_t> modified_time_;
+
+ Random rnd_;
+ std::atomic<uint64_t> fsynced_bytes_;
+};
+
+namespace {
+
+class MockSequentialFile : public SequentialFile {
+ public:
+ explicit MockSequentialFile(MemFile* file) : file_(file), pos_(0) {
+ file_->Ref();
+ }
+
+ ~MockSequentialFile() override { file_->Unref(); }
+
+ Status Read(size_t n, Slice* result, char* scratch) override {
+ Status s = file_->Read(pos_, n, result, scratch);
+ if (s.ok()) {
+ pos_ += result->size();
+ }
+ return s;
+ }
+
+ Status Skip(uint64_t n) override {
+ if (pos_ > file_->Size()) {
+ return Status::IOError("pos_ > file_->Size()");
+ }
+ const uint64_t available = file_->Size() - pos_;
+ if (n > available) {
+ n = available;
+ }
+ pos_ += static_cast<size_t>(n);
+ return Status::OK();
+ }
+
+ private:
+ MemFile* file_;
+ size_t pos_;
+};
+
+class MockRandomAccessFile : public RandomAccessFile {
+ public:
+ explicit MockRandomAccessFile(MemFile* file) : file_(file) { file_->Ref(); }
+
+ ~MockRandomAccessFile() override { file_->Unref(); }
+
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ return file_->Read(offset, n, result, scratch);
+ }
+
+ private:
+ MemFile* file_;
+};
+
+class MockRandomRWFile : public RandomRWFile {
+ public:
+ explicit MockRandomRWFile(MemFile* file) : file_(file) { file_->Ref(); }
+
+ ~MockRandomRWFile() override { file_->Unref(); }
+
+ Status Write(uint64_t offset, const Slice& data) override {
+ return file_->Write(offset, data);
+ }
+
+ Status Read(uint64_t offset, size_t n, Slice* result,
+ char* scratch) const override {
+ return file_->Read(offset, n, result, scratch);
+ }
+
+ Status Close() override { return file_->Fsync(); }
+
+ Status Flush() override { return Status::OK(); }
+
+ Status Sync() override { return file_->Fsync(); }
+
+ private:
+ MemFile* file_;
+};
+
+class MockWritableFile : public WritableFile {
+ public:
+ MockWritableFile(MemFile* file, RateLimiter* rate_limiter)
+ : file_(file), rate_limiter_(rate_limiter) {
+ file_->Ref();
+ }
+
+ ~MockWritableFile() override { file_->Unref(); }
+
+ Status Append(const Slice& data) override {
+ size_t bytes_written = 0;
+ while (bytes_written < data.size()) {
+ auto bytes = RequestToken(data.size() - bytes_written);
+ Status s = file_->Append(Slice(data.data() + bytes_written, bytes));
+ if (!s.ok()) {
+ return s;
+ }
+ bytes_written += bytes;
+ }
+ return Status::OK();
+ }
+ Status Truncate(uint64_t size) override {
+ file_->Truncate(static_cast<size_t>(size));
+ return Status::OK();
+ }
+ Status Close() override { return file_->Fsync(); }
+
+ Status Flush() override { return Status::OK(); }
+
+ Status Sync() override { return file_->Fsync(); }
+
+ uint64_t GetFileSize() override { return file_->Size(); }
+
+ private:
+ inline size_t RequestToken(size_t bytes) {
+ if (rate_limiter_ && io_priority_ < Env::IO_TOTAL) {
+ bytes = std::min(
+ bytes, static_cast<size_t>(rate_limiter_->GetSingleBurstBytes()));
+ rate_limiter_->Request(bytes, io_priority_);
+ }
+ return bytes;
+ }
+
+ MemFile* file_;
+ RateLimiter* rate_limiter_;
+};
+
+class MockEnvDirectory : public Directory {
+ public:
+ Status Fsync() override { return Status::OK(); }
+};
+
+class MockEnvFileLock : public FileLock {
+ public:
+ explicit MockEnvFileLock(const std::string& fname) : fname_(fname) {}
+
+ std::string FileName() const { return fname_; }
+
+ private:
+ const std::string fname_;
+};
+
+class TestMemLogger : public Logger {
+ private:
+ std::unique_ptr<WritableFile> file_;
+ std::atomic_size_t log_size_;
+ static const uint64_t flush_every_seconds_ = 5;
+ std::atomic_uint_fast64_t last_flush_micros_;
+ Env* env_;
+ std::atomic<bool> flush_pending_;
+
+ public:
+ TestMemLogger(std::unique_ptr<WritableFile> f, Env* env,
+ const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
+ : Logger(log_level),
+ file_(std::move(f)),
+ log_size_(0),
+ last_flush_micros_(0),
+ env_(env),
+ flush_pending_(false) {}
+ ~TestMemLogger() override {}
+
+ void Flush() override {
+ if (flush_pending_) {
+ flush_pending_ = false;
+ }
+ last_flush_micros_ = env_->NowMicros();
+ }
+
+ using Logger::Logv;
+ void Logv(const char* format, va_list ap) override {
+ // We try twice: the first time with a fixed-size stack allocated buffer,
+ // and the second time with a much larger dynamically allocated buffer.
+ char buffer[500];
+ for (int iter = 0; iter < 2; iter++) {
+ char* base;
+ int bufsize;
+ if (iter == 0) {
+ bufsize = sizeof(buffer);
+ base = buffer;
+ } else {
+ bufsize = 30000;
+ base = new char[bufsize];
+ }
+ char* p = base;
+ char* limit = base + bufsize;
+
+ struct timeval now_tv;
+ gettimeofday(&now_tv, nullptr);
+ const time_t seconds = now_tv.tv_sec;
+ struct tm t;
+ memset(&t, 0, sizeof(t));
+ struct tm* ret __attribute__((__unused__));
+ ret = localtime_r(&seconds, &t);
+ assert(ret);
+ p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d ",
+ t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
+ t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec));
+
+ // Print the message
+ if (p < limit) {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ p += vsnprintf(p, limit - p, format, backup_ap);
+ va_end(backup_ap);
+ }
+
+ // Truncate to available space if necessary
+ if (p >= limit) {
+ if (iter == 0) {
+ continue; // Try again with larger buffer
+ } else {
+ p = limit - 1;
+ }
+ }
+
+ // Add newline if necessary
+ if (p == base || p[-1] != '\n') {
+ *p++ = '\n';
+ }
+
+ assert(p <= limit);
+ const size_t write_size = p - base;
+
+ file_->Append(Slice(base, write_size));
+ flush_pending_ = true;
+ log_size_ += write_size;
+ uint64_t now_micros =
+ static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + now_tv.tv_usec;
+ if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
+ flush_pending_ = false;
+ last_flush_micros_ = now_micros;
+ }
+ if (base != buffer) {
+ delete[] base;
+ }
+ break;
+ }
+ }
+ size_t GetLogFileSize() const override { return log_size_; }
+};
+
+} // Anonymous namespace
+
+MockEnv::MockEnv(Env* base_env) : EnvWrapper(base_env), fake_sleep_micros_(0) {}
+
+MockEnv::~MockEnv() {
+ for (FileSystem::iterator i = file_map_.begin(); i != file_map_.end(); ++i) {
+ i->second->Unref();
+ }
+}
+
+// Partial implementation of the Env interface.
+Status MockEnv::NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& /*soptions*/) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ *result = nullptr;
+ return Status::IOError(fn, "File not found");
+ }
+ auto* f = file_map_[fn];
+ if (f->is_lock_file()) {
+ return Status::InvalidArgument(fn, "Cannot open a lock file.");
+ }
+ result->reset(new MockSequentialFile(f));
+ return Status::OK();
+}
+
+Status MockEnv::NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& /*soptions*/) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ *result = nullptr;
+ return Status::IOError(fn, "File not found");
+ }
+ auto* f = file_map_[fn];
+ if (f->is_lock_file()) {
+ return Status::InvalidArgument(fn, "Cannot open a lock file.");
+ }
+ result->reset(new MockRandomAccessFile(f));
+ return Status::OK();
+}
+
+Status MockEnv::NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& /*soptions*/) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ *result = nullptr;
+ return Status::IOError(fn, "File not found");
+ }
+ auto* f = file_map_[fn];
+ if (f->is_lock_file()) {
+ return Status::InvalidArgument(fn, "Cannot open a lock file.");
+ }
+ result->reset(new MockRandomRWFile(f));
+ return Status::OK();
+}
+
+Status MockEnv::ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) {
+ auto s = RenameFile(old_fname, fname);
+ if (!s.ok()) {
+ return s;
+ }
+ result->reset();
+ return NewWritableFile(fname, result, options);
+}
+
+Status MockEnv::NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& env_options) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ DeleteFileInternal(fn);
+ }
+ MemFile* file = new MemFile(this, fn, false);
+ file->Ref();
+ file_map_[fn] = file;
+
+ result->reset(new MockWritableFile(file, env_options.rate_limiter));
+ return Status::OK();
+}
+
+Status MockEnv::NewDirectory(const std::string& /*name*/,
+ std::unique_ptr<Directory>* result) {
+ result->reset(new MockEnvDirectory());
+ return Status::OK();
+}
+
+Status MockEnv::FileExists(const std::string& fname) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ // File exists
+ return Status::OK();
+ }
+ // Now also check if fn exists as a dir
+ for (const auto& iter : file_map_) {
+ const std::string& filename = iter.first;
+ if (filename.size() >= fn.size() + 1 && filename[fn.size()] == '/' &&
+ Slice(filename).starts_with(Slice(fn))) {
+ return Status::OK();
+ }
+ }
+ return Status::NotFound();
+}
+
+Status MockEnv::GetChildren(const std::string& dir,
+ std::vector<std::string>* result) {
+ auto d = NormalizePath(dir);
+ bool found_dir = false;
+ {
+ MutexLock lock(&mutex_);
+ result->clear();
+ for (const auto& iter : file_map_) {
+ const std::string& filename = iter.first;
+
+ if (filename == d) {
+ found_dir = true;
+ } else if (filename.size() >= d.size() + 1 && filename[d.size()] == '/' &&
+ Slice(filename).starts_with(Slice(d))) {
+ found_dir = true;
+ size_t next_slash = filename.find('/', d.size() + 1);
+ if (next_slash != std::string::npos) {
+ result->push_back(
+ filename.substr(d.size() + 1, next_slash - d.size() - 1));
+ } else {
+ result->push_back(filename.substr(d.size() + 1));
+ }
+ }
+ }
+ }
+ result->erase(std::unique(result->begin(), result->end()), result->end());
+ return found_dir ? Status::OK() : Status::NotFound();
+}
+
+void MockEnv::DeleteFileInternal(const std::string& fname) {
+ assert(fname == NormalizePath(fname));
+ const auto& pair = file_map_.find(fname);
+ if (pair != file_map_.end()) {
+ pair->second->Unref();
+ file_map_.erase(fname);
+ }
+}
+
+Status MockEnv::DeleteFile(const std::string& fname) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) == file_map_.end()) {
+ return Status::IOError(fn, "File not found");
+ }
+
+ DeleteFileInternal(fn);
+ return Status::OK();
+}
+
+Status MockEnv::Truncate(const std::string& fname, size_t size) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return Status::IOError(fn, "File not found");
+ }
+ iter->second->Truncate(size);
+ return Status::OK();
+}
+
+Status MockEnv::CreateDir(const std::string& dirname) {
+ auto dn = NormalizePath(dirname);
+ if (file_map_.find(dn) == file_map_.end()) {
+ MemFile* file = new MemFile(this, dn, false);
+ file->Ref();
+ file_map_[dn] = file;
+ } else {
+ return Status::IOError();
+ }
+ return Status::OK();
+}
+
+Status MockEnv::CreateDirIfMissing(const std::string& dirname) {
+ CreateDir(dirname);
+ return Status::OK();
+}
+
+Status MockEnv::DeleteDir(const std::string& dirname) {
+ return DeleteFile(dirname);
+}
+
+Status MockEnv::GetFileSize(const std::string& fname, uint64_t* file_size) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return Status::IOError(fn, "File not found");
+ }
+
+ *file_size = iter->second->Size();
+ return Status::OK();
+}
+
+Status MockEnv::GetFileModificationTime(const std::string& fname,
+ uint64_t* time) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return Status::IOError(fn, "File not found");
+ }
+ *time = iter->second->ModifiedTime();
+ return Status::OK();
+}
+
+Status MockEnv::RenameFile(const std::string& src, const std::string& dest) {
+ auto s = NormalizePath(src);
+ auto t = NormalizePath(dest);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(s) == file_map_.end()) {
+ return Status::IOError(s, "File not found");
+ }
+
+ DeleteFileInternal(t);
+ file_map_[t] = file_map_[s];
+ file_map_.erase(s);
+ return Status::OK();
+}
+
+Status MockEnv::LinkFile(const std::string& src, const std::string& dest) {
+ auto s = NormalizePath(src);
+ auto t = NormalizePath(dest);
+ MutexLock lock(&mutex_);
+ if (file_map_.find(s) == file_map_.end()) {
+ return Status::IOError(s, "File not found");
+ }
+
+ DeleteFileInternal(t);
+ file_map_[t] = file_map_[s];
+ file_map_[t]->Ref(); // Otherwise it might get deleted when noone uses s
+ return Status::OK();
+}
+
+Status MockEnv::NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ MemFile* file = nullptr;
+ if (iter == file_map_.end()) {
+ file = new MemFile(this, fn, false);
+ file->Ref();
+ file_map_[fn] = file;
+ } else {
+ file = iter->second;
+ }
+ std::unique_ptr<WritableFile> f(new MockWritableFile(file, nullptr));
+ result->reset(new TestMemLogger(std::move(f), this));
+ return Status::OK();
+}
+
+Status MockEnv::LockFile(const std::string& fname, FileLock** flock) {
+ auto fn = NormalizePath(fname);
+ {
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ if (!file_map_[fn]->is_lock_file()) {
+ return Status::InvalidArgument(fname, "Not a lock file.");
+ }
+ if (!file_map_[fn]->Lock()) {
+ return Status::IOError(fn, "Lock is already held.");
+ }
+ } else {
+ auto* file = new MemFile(this, fn, true);
+ file->Ref();
+ file->Lock();
+ file_map_[fn] = file;
+ }
+ }
+ *flock = new MockEnvFileLock(fn);
+ return Status::OK();
+}
+
+Status MockEnv::UnlockFile(FileLock* flock) {
+ std::string fn =
+ static_cast_with_check<MockEnvFileLock, FileLock>(flock)->FileName();
+ {
+ MutexLock lock(&mutex_);
+ if (file_map_.find(fn) != file_map_.end()) {
+ if (!file_map_[fn]->is_lock_file()) {
+ return Status::InvalidArgument(fn, "Not a lock file.");
+ }
+ file_map_[fn]->Unlock();
+ }
+ }
+ delete flock;
+ return Status::OK();
+}
+
+Status MockEnv::GetTestDirectory(std::string* path) {
+ *path = "/test";
+ return Status::OK();
+}
+
+Status MockEnv::GetCurrentTime(int64_t* unix_time) {
+ auto s = EnvWrapper::GetCurrentTime(unix_time);
+ if (s.ok()) {
+ *unix_time += fake_sleep_micros_.load() / (1000 * 1000);
+ }
+ return s;
+}
+
+uint64_t MockEnv::NowMicros() {
+ return EnvWrapper::NowMicros() + fake_sleep_micros_.load();
+}
+
+uint64_t MockEnv::NowNanos() {
+ return EnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
+}
+
+Status MockEnv::CorruptBuffer(const std::string& fname) {
+ auto fn = NormalizePath(fname);
+ MutexLock lock(&mutex_);
+ auto iter = file_map_.find(fn);
+ if (iter == file_map_.end()) {
+ return Status::IOError(fn, "File not found");
+ }
+ iter->second->CorruptBuffer();
+ return Status::OK();
+}
+
+std::string MockEnv::NormalizePath(const std::string path) {
+ std::string dst;
+ for (auto c : path) {
+ if (!dst.empty() && c == '/' && dst.back() == '/') {
+ continue;
+ }
+ dst.push_back(c);
+ }
+ return dst;
+}
+
+void MockEnv::FakeSleepForMicroseconds(int64_t micros) {
+ fake_sleep_micros_.fetch_add(micros);
+}
+
+#ifndef ROCKSDB_LITE
+// This is to maintain the behavior before swithcing from InMemoryEnv to MockEnv
+Env* NewMemEnv(Env* base_env) { return new MockEnv(base_env); }
+
+#else // ROCKSDB_LITE
+
+Env* NewMemEnv(Env* /*base_env*/) { return nullptr; }
+
+#endif // !ROCKSDB_LITE
+
+} // namespace rocksdb
diff --git a/src/rocksdb/env/mock_env.h b/src/rocksdb/env/mock_env.h
new file mode 100644
index 00000000..87b8deaf
--- /dev/null
+++ b/src/rocksdb/env/mock_env.h
@@ -0,0 +1,114 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+#pragma once
+
+#include <atomic>
+#include <map>
+#include <string>
+#include <vector>
+#include "rocksdb/env.h"
+#include "rocksdb/status.h"
+#include "port/port.h"
+#include "util/mutexlock.h"
+
+namespace rocksdb {
+
+class MemFile;
+class MockEnv : public EnvWrapper {
+ public:
+ explicit MockEnv(Env* base_env);
+
+ virtual ~MockEnv();
+
+ // Partial implementation of the Env interface.
+ virtual Status NewSequentialFile(const std::string& fname,
+ std::unique_ptr<SequentialFile>* result,
+ const EnvOptions& soptions) override;
+
+ virtual Status NewRandomAccessFile(const std::string& fname,
+ std::unique_ptr<RandomAccessFile>* result,
+ const EnvOptions& soptions) override;
+
+ virtual Status NewRandomRWFile(const std::string& fname,
+ std::unique_ptr<RandomRWFile>* result,
+ const EnvOptions& options) override;
+
+ virtual Status ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override;
+
+ virtual Status NewWritableFile(const std::string& fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& env_options) override;
+
+ virtual Status NewDirectory(const std::string& name,
+ std::unique_ptr<Directory>* result) override;
+
+ virtual Status FileExists(const std::string& fname) override;
+
+ virtual Status GetChildren(const std::string& dir,
+ std::vector<std::string>* result) override;
+
+ void DeleteFileInternal(const std::string& fname);
+
+ virtual Status DeleteFile(const std::string& fname) override;
+
+ virtual Status Truncate(const std::string& fname, size_t size) override;
+
+ virtual Status CreateDir(const std::string& dirname) override;
+
+ virtual Status CreateDirIfMissing(const std::string& dirname) override;
+
+ virtual Status DeleteDir(const std::string& dirname) override;
+
+ virtual Status GetFileSize(const std::string& fname,
+ uint64_t* file_size) override;
+
+ virtual Status GetFileModificationTime(const std::string& fname,
+ uint64_t* time) override;
+
+ virtual Status RenameFile(const std::string& src,
+ const std::string& target) override;
+
+ virtual Status LinkFile(const std::string& src,
+ const std::string& target) override;
+
+ virtual Status NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) override;
+
+ virtual Status LockFile(const std::string& fname, FileLock** flock) override;
+
+ virtual Status UnlockFile(FileLock* flock) override;
+
+ virtual Status GetTestDirectory(std::string* path) override;
+
+ // Results of these can be affected by FakeSleepForMicroseconds()
+ virtual Status GetCurrentTime(int64_t* unix_time) override;
+ virtual uint64_t NowMicros() override;
+ virtual uint64_t NowNanos() override;
+
+ Status CorruptBuffer(const std::string& fname);
+
+ // Doesn't really sleep, just affects output of GetCurrentTime(), NowMicros()
+ // and NowNanos()
+ void FakeSleepForMicroseconds(int64_t micros);
+
+ private:
+ std::string NormalizePath(const std::string path);
+
+ // Map from filenames to MemFile objects, representing a simple file system.
+ typedef std::map<std::string, MemFile*> FileSystem;
+ port::Mutex mutex_;
+ FileSystem file_map_; // Protected by mutex_.
+
+ std::atomic<int64_t> fake_sleep_micros_;
+};
+
+} // namespace rocksdb
diff --git a/src/rocksdb/env/mock_env_test.cc b/src/rocksdb/env/mock_env_test.cc
new file mode 100644
index 00000000..2daf682e
--- /dev/null
+++ b/src/rocksdb/env/mock_env_test.cc
@@ -0,0 +1,83 @@
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "env/mock_env.h"
+
+#include <memory>
+#include <string>
+
+#include "rocksdb/env.h"
+#include "util/testharness.h"
+
+namespace rocksdb {
+
+class MockEnvTest : public testing::Test {
+ public:
+ MockEnv* env_;
+ const EnvOptions soptions_;
+
+ MockEnvTest()
+ : env_(new MockEnv(Env::Default())) {
+ }
+ ~MockEnvTest() override { delete env_; }
+};
+
+TEST_F(MockEnvTest, Corrupt) {
+ const std::string kGood = "this is a good string, synced to disk";
+ const std::string kCorrupted = "this part may be corrupted";
+ const std::string kFileName = "/dir/f";
+ std::unique_ptr<WritableFile> writable_file;
+ ASSERT_OK(env_->NewWritableFile(kFileName, &writable_file, soptions_));
+ ASSERT_OK(writable_file->Append(kGood));
+ ASSERT_TRUE(writable_file->GetFileSize() == kGood.size());
+
+ std::string scratch;
+ scratch.resize(kGood.size() + kCorrupted.size() + 16);
+ Slice result;
+ std::unique_ptr<RandomAccessFile> rand_file;
+ ASSERT_OK(env_->NewRandomAccessFile(kFileName, &rand_file, soptions_));
+ ASSERT_OK(rand_file->Read(0, kGood.size(), &result, &(scratch[0])));
+ ASSERT_EQ(result.compare(kGood), 0);
+
+ // Sync + corrupt => no change
+ ASSERT_OK(writable_file->Fsync());
+ ASSERT_OK(dynamic_cast<MockEnv*>(env_)->CorruptBuffer(kFileName));
+ result.clear();
+ ASSERT_OK(rand_file->Read(0, kGood.size(), &result, &(scratch[0])));
+ ASSERT_EQ(result.compare(kGood), 0);
+
+ // Add new data and corrupt it
+ ASSERT_OK(writable_file->Append(kCorrupted));
+ ASSERT_TRUE(writable_file->GetFileSize() == kGood.size() + kCorrupted.size());
+ result.clear();
+ ASSERT_OK(rand_file->Read(kGood.size(), kCorrupted.size(),
+ &result, &(scratch[0])));
+ ASSERT_EQ(result.compare(kCorrupted), 0);
+ // Corrupted
+ ASSERT_OK(dynamic_cast<MockEnv*>(env_)->CorruptBuffer(kFileName));
+ result.clear();
+ ASSERT_OK(rand_file->Read(kGood.size(), kCorrupted.size(),
+ &result, &(scratch[0])));
+ ASSERT_NE(result.compare(kCorrupted), 0);
+}
+
+TEST_F(MockEnvTest, FakeSleeping) {
+ int64_t now = 0;
+ auto s = env_->GetCurrentTime(&now);
+ ASSERT_OK(s);
+ env_->FakeSleepForMicroseconds(3 * 1000 * 1000);
+ int64_t after_sleep = 0;
+ s = env_->GetCurrentTime(&after_sleep);
+ ASSERT_OK(s);
+ auto delta = after_sleep - now;
+ // this will be true unless test runs for 2 seconds
+ ASSERT_TRUE(delta == 3 || delta == 4);
+}
+
+} // namespace rocksdb
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/src/rocksdb/env/posix_logger.h b/src/rocksdb/env/posix_logger.h
new file mode 100644
index 00000000..401df6a3
--- /dev/null
+++ b/src/rocksdb/env/posix_logger.h
@@ -0,0 +1,185 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+//
+// Logger implementation that can be shared by all environments
+// where enough posix functionality is available.
+
+#pragma once
+#include <algorithm>
+#include <stdio.h>
+#include "port/sys_time.h"
+#include <time.h>
+#include <fcntl.h>
+
+#ifdef OS_LINUX
+#ifndef FALLOC_FL_KEEP_SIZE
+#include <linux/falloc.h>
+#endif
+#endif
+
+#include <atomic>
+#include "env/io_posix.h"
+#include "monitoring/iostats_context_imp.h"
+#include "rocksdb/env.h"
+#include "util/sync_point.h"
+
+namespace rocksdb {
+
+class PosixLogger : public Logger {
+ private:
+ Status PosixCloseHelper() {
+ int ret;
+
+ ret = fclose(file_);
+ if (ret) {
+ return IOError("Unable to close log file", "", ret);
+ }
+ return Status::OK();
+ }
+ FILE* file_;
+ uint64_t (*gettid_)(); // Return the thread id for the current thread
+ std::atomic_size_t log_size_;
+ int fd_;
+ const static uint64_t flush_every_seconds_ = 5;
+ std::atomic_uint_fast64_t last_flush_micros_;
+ Env* env_;
+ std::atomic<bool> flush_pending_;
+
+ protected:
+ virtual Status CloseImpl() override { return PosixCloseHelper(); }
+
+ public:
+ PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env,
+ const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
+ : Logger(log_level),
+ file_(f),
+ gettid_(gettid),
+ log_size_(0),
+ fd_(fileno(f)),
+ last_flush_micros_(0),
+ env_(env),
+ flush_pending_(false) {}
+ virtual ~PosixLogger() {
+ if (!closed_) {
+ closed_ = true;
+ PosixCloseHelper();
+ }
+ }
+ virtual void Flush() override {
+ TEST_SYNC_POINT("PosixLogger::Flush:Begin1");
+ TEST_SYNC_POINT("PosixLogger::Flush:Begin2");
+ if (flush_pending_) {
+ flush_pending_ = false;
+ fflush(file_);
+ }
+ last_flush_micros_ = env_->NowMicros();
+ }
+
+ using Logger::Logv;
+ virtual void Logv(const char* format, va_list ap) override {
+ IOSTATS_TIMER_GUARD(logger_nanos);
+
+ const uint64_t thread_id = (*gettid_)();
+
+ // We try twice: the first time with a fixed-size stack allocated buffer,
+ // and the second time with a much larger dynamically allocated buffer.
+ char buffer[500];
+ for (int iter = 0; iter < 2; iter++) {
+ char* base;
+ int bufsize;
+ if (iter == 0) {
+ bufsize = sizeof(buffer);
+ base = buffer;
+ } else {
+ bufsize = 65536;
+ base = new char[bufsize];
+ }
+ char* p = base;
+ char* limit = base + bufsize;
+
+ struct timeval now_tv;
+ gettimeofday(&now_tv, nullptr);
+ const time_t seconds = now_tv.tv_sec;
+ struct tm t;
+ localtime_r(&seconds, &t);
+ p += snprintf(p, limit - p,
+ "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
+ t.tm_year + 1900,
+ t.tm_mon + 1,
+ t.tm_mday,
+ t.tm_hour,
+ t.tm_min,
+ t.tm_sec,
+ static_cast<int>(now_tv.tv_usec),
+ static_cast<long long unsigned int>(thread_id));
+
+ // Print the message
+ if (p < limit) {
+ va_list backup_ap;
+ va_copy(backup_ap, ap);
+ p += vsnprintf(p, limit - p, format, backup_ap);
+ va_end(backup_ap);
+ }
+
+ // Truncate to available space if necessary
+ if (p >= limit) {
+ if (iter == 0) {
+ continue; // Try again with larger buffer
+ } else {
+ p = limit - 1;
+ }
+ }
+
+ // Add newline if necessary
+ if (p == base || p[-1] != '\n') {
+ *p++ = '\n';
+ }
+
+ assert(p <= limit);
+ const size_t write_size = p - base;
+
+#ifdef ROCKSDB_FALLOCATE_PRESENT
+ const int kDebugLogChunkSize = 128 * 1024;
+
+ // If this write would cross a boundary of kDebugLogChunkSize
+ // space, pre-allocate more space to avoid overly large
+ // allocations from filesystem allocsize options.
+ const size_t log_size = log_size_;
+ const size_t last_allocation_chunk =
+ ((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize);
+ const size_t desired_allocation_chunk =
+ ((kDebugLogChunkSize - 1 + log_size + write_size) /
+ kDebugLogChunkSize);
+ if (last_allocation_chunk != desired_allocation_chunk) {
+ fallocate(
+ fd_, FALLOC_FL_KEEP_SIZE, 0,
+ static_cast<off_t>(desired_allocation_chunk * kDebugLogChunkSize));
+ }
+#endif
+
+ size_t sz = fwrite(base, 1, write_size, file_);
+ flush_pending_ = true;
+ if (sz > 0) {
+ log_size_ += write_size;
+ }
+ uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 +
+ now_tv.tv_usec;
+ if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
+ Flush();
+ }
+ if (base != buffer) {
+ delete[] base;
+ }
+ break;
+ }
+ }
+ size_t GetLogFileSize() const override { return log_size_; }
+};
+
+} // namespace rocksdb