summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/env/env.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/env/env.cc475
1 files changed, 475 insertions, 0 deletions
diff --git a/src/rocksdb/env/env.cc b/src/rocksdb/env/env.cc
new file mode 100644
index 000000000..70f4b29f7
--- /dev/null
+++ b/src/rocksdb/env/env.cc
@@ -0,0 +1,475 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file. See the AUTHORS file for names of contributors.
+
+#include "rocksdb/env.h"
+
+#include <thread>
+#include "env/composite_env_wrapper.h"
+#include "logging/env_logger.h"
+#include "memory/arena.h"
+#include "options/db_options.h"
+#include "port/port.h"
+#include "port/sys_time.h"
+#include "rocksdb/options.h"
+#include "rocksdb/utilities/object_registry.h"
+#include "util/autovector.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+Env::~Env() {
+}
+
+Status Env::NewLogger(const std::string& fname,
+ std::shared_ptr<Logger>* result) {
+ return NewEnvLogger(fname, this, result);
+}
+
+Status Env::LoadEnv(const std::string& value, Env** result) {
+ Env* env = *result;
+ Status s;
+#ifndef ROCKSDB_LITE
+ s = ObjectRegistry::NewInstance()->NewStaticObject<Env>(value, &env);
+#else
+ s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
+#endif
+ if (s.ok()) {
+ *result = env;
+ }
+ return s;
+}
+
+Status Env::LoadEnv(const std::string& value, Env** result,
+ std::shared_ptr<Env>* guard) {
+ assert(result);
+ Status s;
+#ifndef ROCKSDB_LITE
+ Env* env = nullptr;
+ std::unique_ptr<Env> uniq_guard;
+ std::string err_msg;
+ assert(guard != nullptr);
+ env = ObjectRegistry::NewInstance()->NewObject<Env>(value, &uniq_guard,
+ &err_msg);
+ if (!env) {
+ s = Status::NotFound(std::string("Cannot load ") + Env::Type() + ": " +
+ value);
+ env = Env::Default();
+ }
+ if (s.ok() && uniq_guard) {
+ guard->reset(uniq_guard.release());
+ *result = guard->get();
+ } else {
+ *result = env;
+ }
+#else
+ (void)result;
+ (void)guard;
+ s = Status::NotSupported("Cannot load environment in LITE mode: ", value);
+#endif
+ return s;
+}
+
+std::string Env::PriorityToString(Env::Priority priority) {
+ switch (priority) {
+ case Env::Priority::BOTTOM:
+ return "Bottom";
+ case Env::Priority::LOW:
+ return "Low";
+ case Env::Priority::HIGH:
+ return "High";
+ case Env::Priority::USER:
+ return "User";
+ case Env::Priority::TOTAL:
+ assert(false);
+ }
+ return "Invalid";
+}
+
+uint64_t Env::GetThreadID() const {
+ std::hash<std::thread::id> hasher;
+ return hasher(std::this_thread::get_id());
+}
+
+Status Env::ReuseWritableFile(const std::string& fname,
+ const std::string& old_fname,
+ std::unique_ptr<WritableFile>* result,
+ const EnvOptions& options) {
+ Status s = RenameFile(old_fname, fname);
+ if (!s.ok()) {
+ return s;
+ }
+ return NewWritableFile(fname, result, options);
+}
+
+Status Env::GetChildrenFileAttributes(const std::string& dir,
+ std::vector<FileAttributes>* result) {
+ assert(result != nullptr);
+ std::vector<std::string> child_fnames;
+ Status s = GetChildren(dir, &child_fnames);
+ if (!s.ok()) {
+ return s;
+ }
+ result->resize(child_fnames.size());
+ size_t result_size = 0;
+ for (size_t i = 0; i < child_fnames.size(); ++i) {
+ const std::string path = dir + "/" + child_fnames[i];
+ if (!(s = GetFileSize(path, &(*result)[result_size].size_bytes)).ok()) {
+ if (FileExists(path).IsNotFound()) {
+ // The file may have been deleted since we listed the directory
+ continue;
+ }
+ return s;
+ }
+ (*result)[result_size].name = std::move(child_fnames[i]);
+ result_size++;
+ }
+ result->resize(result_size);
+ return Status::OK();
+}
+
+SequentialFile::~SequentialFile() {
+}
+
+RandomAccessFile::~RandomAccessFile() {
+}
+
+WritableFile::~WritableFile() {
+}
+
+MemoryMappedFileBuffer::~MemoryMappedFileBuffer() {}
+
+Logger::~Logger() {}
+
+Status Logger::Close() {
+ if (!closed_) {
+ closed_ = true;
+ return CloseImpl();
+ } else {
+ return Status::OK();
+ }
+}
+
+Status Logger::CloseImpl() { return Status::NotSupported(); }
+
+FileLock::~FileLock() {
+}
+
+void LogFlush(Logger *info_log) {
+ if (info_log) {
+ info_log->Flush();
+ }
+}
+
+static void Logv(Logger *info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
+ info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
+ }
+}
+
+void Log(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(info_log, format, ap);
+ va_end(ap);
+}
+
+void Logger::Logv(const InfoLogLevel log_level, const char* format, va_list ap) {
+ static const char* kInfoLogLevelNames[5] = { "DEBUG", "INFO", "WARN",
+ "ERROR", "FATAL" };
+ if (log_level < log_level_) {
+ return;
+ }
+
+ if (log_level == InfoLogLevel::INFO_LEVEL) {
+ // Doesn't print log level if it is INFO level.
+ // This is to avoid unexpected performance regression after we add
+ // the feature of log level. All the logs before we add the feature
+ // are INFO level. We don't want to add extra costs to those existing
+ // logging.
+ Logv(format, ap);
+ } else if (log_level == InfoLogLevel::HEADER_LEVEL) {
+ LogHeader(format, ap);
+ } else {
+ char new_format[500];
+ snprintf(new_format, sizeof(new_format) - 1, "[%s] %s",
+ kInfoLogLevelNames[log_level], format);
+ Logv(new_format, ap);
+ }
+}
+
+static void Logv(const InfoLogLevel log_level, Logger *info_log, const char *format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= log_level) {
+ if (log_level == InfoLogLevel::HEADER_LEVEL) {
+ info_log->LogHeader(format, ap);
+ } else {
+ info_log->Logv(log_level, format, ap);
+ }
+ }
+}
+
+void Log(const InfoLogLevel log_level, Logger* info_log, const char* format,
+ ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(log_level, info_log, format, ap);
+ va_end(ap);
+}
+
+static void Headerv(Logger *info_log, const char *format, va_list ap) {
+ if (info_log) {
+ info_log->LogHeader(format, ap);
+ }
+}
+
+void Header(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Headerv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Debugv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::DEBUG_LEVEL) {
+ info_log->Logv(InfoLogLevel::DEBUG_LEVEL, format, ap);
+ }
+}
+
+void Debug(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Debugv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Infov(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::INFO_LEVEL) {
+ info_log->Logv(InfoLogLevel::INFO_LEVEL, format, ap);
+ }
+}
+
+void Info(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Infov(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Warnv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::WARN_LEVEL) {
+ info_log->Logv(InfoLogLevel::WARN_LEVEL, format, ap);
+ }
+}
+
+void Warn(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Warnv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Errorv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::ERROR_LEVEL) {
+ info_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
+ }
+}
+
+void Error(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Errorv(info_log, format, ap);
+ va_end(ap);
+}
+
+static void Fatalv(Logger* info_log, const char* format, va_list ap) {
+ if (info_log && info_log->GetInfoLogLevel() <= InfoLogLevel::FATAL_LEVEL) {
+ info_log->Logv(InfoLogLevel::FATAL_LEVEL, format, ap);
+ }
+}
+
+void Fatal(Logger* info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Fatalv(info_log, format, ap);
+ va_end(ap);
+}
+
+void LogFlush(const std::shared_ptr<Logger>& info_log) {
+ LogFlush(info_log.get());
+}
+
+void Log(const InfoLogLevel log_level, const std::shared_ptr<Logger>& info_log,
+ const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(log_level, info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Header(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Headerv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Debug(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Debugv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Info(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Infov(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Warn(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Warnv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Error(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Errorv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Fatal(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Fatalv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+void Log(const std::shared_ptr<Logger>& info_log, const char* format, ...) {
+ va_list ap;
+ va_start(ap, format);
+ Logv(info_log.get(), format, ap);
+ va_end(ap);
+}
+
+Status WriteStringToFile(Env* env, const Slice& data, const std::string& fname,
+ bool should_sync) {
+ std::unique_ptr<WritableFile> file;
+ EnvOptions soptions;
+ Status s = env->NewWritableFile(fname, &file, soptions);
+ if (!s.ok()) {
+ return s;
+ }
+ s = file->Append(data);
+ if (s.ok() && should_sync) {
+ s = file->Sync();
+ }
+ if (!s.ok()) {
+ env->DeleteFile(fname);
+ }
+ return s;
+}
+
+Status ReadFileToString(Env* env, const std::string& fname, std::string* data) {
+ LegacyFileSystemWrapper lfsw(env);
+ return ReadFileToString(&lfsw, fname, data);
+}
+
+EnvWrapper::~EnvWrapper() {
+}
+
+namespace { // anonymous namespace
+
+void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
+ env_options->use_mmap_reads = options.allow_mmap_reads;
+ env_options->use_mmap_writes = options.allow_mmap_writes;
+ env_options->use_direct_reads = options.use_direct_reads;
+ env_options->set_fd_cloexec = options.is_fd_close_on_exec;
+ env_options->bytes_per_sync = options.bytes_per_sync;
+ env_options->compaction_readahead_size = options.compaction_readahead_size;
+ env_options->random_access_max_buffer_size =
+ options.random_access_max_buffer_size;
+ env_options->rate_limiter = options.rate_limiter.get();
+ env_options->writable_file_max_buffer_size =
+ options.writable_file_max_buffer_size;
+ env_options->allow_fallocate = options.allow_fallocate;
+ env_options->strict_bytes_per_sync = options.strict_bytes_per_sync;
+ options.env->SanitizeEnvOptions(env_options);
+}
+
+}
+
+EnvOptions Env::OptimizeForLogWrite(const EnvOptions& env_options,
+ const DBOptions& db_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.bytes_per_sync = db_options.wal_bytes_per_sync;
+ optimized_env_options.writable_file_max_buffer_size =
+ db_options.writable_file_max_buffer_size;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
+ return env_options;
+}
+
+EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_reads = false;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_reads = false;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForCompactionTableWrite(
+ const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_writes =
+ db_options.use_direct_io_for_flush_and_compaction;
+ return optimized_env_options;
+}
+
+EnvOptions Env::OptimizeForCompactionTableRead(
+ const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
+ EnvOptions optimized_env_options(env_options);
+ optimized_env_options.use_direct_reads = db_options.use_direct_reads;
+ return optimized_env_options;
+}
+
+EnvOptions::EnvOptions(const DBOptions& options) {
+ AssignEnvOptions(this, options);
+}
+
+EnvOptions::EnvOptions() {
+ DBOptions options;
+ AssignEnvOptions(this, options);
+}
+
+Status NewEnvLogger(const std::string& fname, Env* env,
+ std::shared_ptr<Logger>* result) {
+ EnvOptions options;
+ // TODO: Tune the buffer size.
+ options.writable_file_max_buffer_size = 1024 * 1024;
+ std::unique_ptr<WritableFile> writable_file;
+ const auto status = env->NewWritableFile(fname, &writable_file, options);
+ if (!status.ok()) {
+ return status;
+ }
+
+ *result = std::make_shared<EnvLogger>(
+ NewLegacyWritableFileWrapper(std::move(writable_file)), fname, options,
+ env);
+ return Status::OK();
+}
+
+} // namespace ROCKSDB_NAMESPACE