summaryrefslogtreecommitdiffstats
path: root/src/rocksdb/logging/auto_roll_logger.cc
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/rocksdb/logging/auto_roll_logger.cc372
1 files changed, 372 insertions, 0 deletions
diff --git a/src/rocksdb/logging/auto_roll_logger.cc b/src/rocksdb/logging/auto_roll_logger.cc
new file mode 100644
index 000000000..fe0958479
--- /dev/null
+++ b/src/rocksdb/logging/auto_roll_logger.cc
@@ -0,0 +1,372 @@
+// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
+//
+#include "logging/auto_roll_logger.h"
+
+#include <algorithm>
+
+#include "file/filename.h"
+#include "logging/logging.h"
+#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/system_clock.h"
+#include "util/mutexlock.h"
+
+namespace ROCKSDB_NAMESPACE {
+
+#ifndef ROCKSDB_LITE
+// -- AutoRollLogger
+
+AutoRollLogger::AutoRollLogger(const std::shared_ptr<FileSystem>& fs,
+ const std::shared_ptr<SystemClock>& clock,
+ const std::string& dbname,
+ const std::string& db_log_dir,
+ size_t log_max_size,
+ size_t log_file_time_to_roll,
+ size_t keep_log_file_num,
+ const InfoLogLevel log_level)
+ : Logger(log_level),
+ dbname_(dbname),
+ db_log_dir_(db_log_dir),
+ fs_(fs),
+ clock_(clock),
+ status_(Status::OK()),
+ kMaxLogFileSize(log_max_size),
+ kLogFileTimeToRoll(log_file_time_to_roll),
+ kKeepLogFileNum(keep_log_file_num),
+ cached_now(static_cast<uint64_t>(clock_->NowMicros() * 1e-6)),
+ ctime_(cached_now),
+ cached_now_access_count(0),
+ call_NowMicros_every_N_records_(100),
+ mutex_() {
+ Status s = fs->GetAbsolutePath(dbname, io_options_, &db_absolute_path_,
+ &io_context_);
+ if (s.IsNotSupported()) {
+ db_absolute_path_ = dbname;
+ } else {
+ status_ = s;
+ }
+ log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_);
+ if (fs_->FileExists(log_fname_, io_options_, &io_context_).ok()) {
+ RollLogFile();
+ }
+ GetExistingFiles();
+ s = ResetLogger();
+ if (s.ok() && status_.ok()) {
+ status_ = TrimOldLogFiles();
+ }
+}
+
+Status AutoRollLogger::ResetLogger() {
+ TEST_SYNC_POINT("AutoRollLogger::ResetLogger:BeforeNewLogger");
+ status_ = fs_->NewLogger(log_fname_, io_options_, &logger_, &io_context_);
+ TEST_SYNC_POINT("AutoRollLogger::ResetLogger:AfterNewLogger");
+
+ if (!status_.ok()) {
+ return status_;
+ }
+ assert(logger_);
+ logger_->SetInfoLogLevel(Logger::GetInfoLogLevel());
+
+ if (logger_->GetLogFileSize() == Logger::kDoNotSupportGetLogFileSize) {
+ status_ = Status::NotSupported(
+ "The underlying logger doesn't support GetLogFileSize()");
+ }
+ if (status_.ok()) {
+ cached_now = static_cast<uint64_t>(clock_->NowMicros() * 1e-6);
+ ctime_ = cached_now;
+ cached_now_access_count = 0;
+ }
+
+ return status_;
+}
+
+void AutoRollLogger::RollLogFile() {
+ // This function is called when log is rotating. Two rotations
+ // can happen quickly (NowMicro returns same value). To not overwrite
+ // previous log file we increment by one micro second and try again.
+ uint64_t now = clock_->NowMicros();
+ std::string old_fname;
+ do {
+ old_fname =
+ OldInfoLogFileName(dbname_, now, db_absolute_path_, db_log_dir_);
+ now++;
+ } while (fs_->FileExists(old_fname, io_options_, &io_context_).ok());
+ // Wait for logger_ reference count to turn to 1 as it might be pinned by
+ // Flush. Pinned Logger can't be closed till Flush is completed on that
+ // Logger.
+ while (logger_.use_count() > 1) {
+ }
+ // Close the existing logger first to release the existing handle
+ // before renaming the file using the file system. If this call
+ // fails there is nothing much we can do and we will continue with the
+ // rename and hence ignoring the result status.
+ if (logger_) {
+ logger_->Close().PermitUncheckedError();
+ }
+ Status s = fs_->RenameFile(log_fname_, old_fname, io_options_, &io_context_);
+ if (!s.ok()) {
+ // What should we do on error?
+ }
+ old_log_files_.push(old_fname);
+}
+
+void AutoRollLogger::GetExistingFiles() {
+ {
+ // Empty the queue to avoid duplicated entries in the queue.
+ std::queue<std::string> empty;
+ std::swap(old_log_files_, empty);
+ }
+
+ std::string parent_dir;
+ std::vector<std::string> info_log_files;
+ Status s =
+ GetInfoLogFiles(fs_, db_log_dir_, dbname_, &parent_dir, &info_log_files);
+ if (status_.ok()) {
+ status_ = s;
+ }
+ // We need to sort the file before enqueing it so that when we
+ // delete file from the front, it is the oldest file.
+ std::sort(info_log_files.begin(), info_log_files.end());
+
+ for (const std::string& f : info_log_files) {
+ old_log_files_.push(parent_dir + "/" + f);
+ }
+}
+
+Status AutoRollLogger::TrimOldLogFiles() {
+ // Here we directly list info files and delete them through FileSystem.
+ // The deletion isn't going through DB, so there are shortcomes:
+ // 1. the deletion is not rate limited by SstFileManager
+ // 2. there is a chance that an I/O will be issued here
+ // Since it's going to be complicated to pass DB object down to
+ // here, we take a simple approach to keep the code easier to
+ // maintain.
+
+ // old_log_files_.empty() is helpful for the corner case that
+ // kKeepLogFileNum == 0. We can instead check kKeepLogFileNum != 0 but
+ // it's essentially the same thing, and checking empty before accessing
+ // the queue feels safer.
+ while (!old_log_files_.empty() && old_log_files_.size() >= kKeepLogFileNum) {
+ Status s =
+ fs_->DeleteFile(old_log_files_.front(), io_options_, &io_context_);
+ // Remove the file from the tracking anyway. It's possible that
+ // DB cleaned up the old log file, or people cleaned it up manually.
+ old_log_files_.pop();
+ // To make the file really go away, we should sync parent directory.
+ // Since there isn't any consistency issue involved here, skipping
+ // this part to avoid one I/O here.
+ if (!s.ok()) {
+ return s;
+ }
+ }
+ return Status::OK();
+}
+
+std::string AutoRollLogger::ValistToString(const char* format,
+ va_list args) const {
+ // Any log messages longer than 1024 will get truncated.
+ // The user is responsible for chopping longer messages into multi line log
+ static const int MAXBUFFERSIZE = 1024;
+ char buffer[MAXBUFFERSIZE];
+
+ int count = vsnprintf(buffer, MAXBUFFERSIZE, format, args);
+ (void)count;
+ assert(count >= 0);
+
+ return buffer;
+}
+
+void AutoRollLogger::LogInternal(const char* format, ...) {
+ mutex_.AssertHeld();
+
+ if (!logger_) {
+ return;
+ }
+
+ va_list args;
+ va_start(args, format);
+ logger_->Logv(format, args);
+ va_end(args);
+}
+
+void AutoRollLogger::Logv(const char* format, va_list ap) {
+ assert(GetStatus().ok());
+ if (!logger_) {
+ return;
+ }
+
+ std::shared_ptr<Logger> logger;
+ {
+ MutexLock l(&mutex_);
+ if ((kLogFileTimeToRoll > 0 && LogExpired()) ||
+ (kMaxLogFileSize > 0 && logger_->GetLogFileSize() >= kMaxLogFileSize)) {
+ RollLogFile();
+ Status s = ResetLogger();
+ Status s2 = TrimOldLogFiles();
+
+ if (!s.ok()) {
+ // can't really log the error if creating a new LOG file failed
+ return;
+ }
+
+ WriteHeaderInfo();
+
+ if (!s2.ok()) {
+ ROCKS_LOG_WARN(logger.get(), "Fail to trim old info log file: %s",
+ s2.ToString().c_str());
+ }
+ }
+
+ // pin down the current logger_ instance before releasing the mutex.
+ logger = logger_;
+ }
+
+ // Another thread could have put a new Logger instance into logger_ by now.
+ // However, since logger is still hanging on to the previous instance
+ // (reference count is not zero), we don't have to worry about it being
+ // deleted while we are accessing it.
+ // Note that logv itself is not mutex protected to allow maximum concurrency,
+ // as thread safety should have been handled by the underlying logger.
+ logger->Logv(format, ap);
+}
+
+void AutoRollLogger::WriteHeaderInfo() {
+ mutex_.AssertHeld();
+ for (auto& header : headers_) {
+ LogInternal("%s", header.c_str());
+ }
+}
+
+void AutoRollLogger::LogHeader(const char* format, va_list args) {
+ if (!logger_) {
+ return;
+ }
+
+ // header message are to be retained in memory. Since we cannot make any
+ // assumptions about the data contained in va_list, we will retain them as
+ // strings
+ va_list tmp;
+ va_copy(tmp, args);
+ std::string data = ValistToString(format, tmp);
+ va_end(tmp);
+
+ MutexLock l(&mutex_);
+ headers_.push_back(data);
+
+ // Log the original message to the current log
+ logger_->Logv(format, args);
+}
+
+bool AutoRollLogger::LogExpired() {
+ if (cached_now_access_count >= call_NowMicros_every_N_records_) {
+ cached_now = static_cast<uint64_t>(clock_->NowMicros() * 1e-6);
+ cached_now_access_count = 0;
+ }
+
+ ++cached_now_access_count;
+ return cached_now >= ctime_ + kLogFileTimeToRoll;
+}
+#endif // !ROCKSDB_LITE
+
+Status CreateLoggerFromOptions(const std::string& dbname,
+ const DBOptions& options,
+ std::shared_ptr<Logger>* logger) {
+ if (options.info_log) {
+ *logger = options.info_log;
+ return Status::OK();
+ }
+
+ Env* env = options.env;
+ std::string db_absolute_path;
+ Status s = env->GetAbsolutePath(dbname, &db_absolute_path);
+ TEST_SYNC_POINT_CALLBACK("rocksdb::CreateLoggerFromOptions:AfterGetPath", &s);
+ if (!s.ok()) {
+ return s;
+ }
+ std::string fname =
+ InfoLogFileName(dbname, db_absolute_path, options.db_log_dir);
+
+ const auto& clock = env->GetSystemClock();
+ // In case it does not exist.
+ s = env->CreateDirIfMissing(dbname);
+ if (!s.ok()) {
+ if (options.db_log_dir.empty()) {
+ return s;
+ } else {
+ // Ignore the error returned during creation of dbname because dbname and
+ // db_log_dir can be on different filesystems in which case dbname will
+ // not exist and error should be ignored. db_log_dir creation will handle
+ // the error in case there is any error in the creation of dbname on same
+ // filesystem.
+ s = Status::OK();
+ }
+ }
+ assert(s.ok());
+
+ if (!options.db_log_dir.empty()) {
+ s = env->CreateDirIfMissing(options.db_log_dir);
+ if (!s.ok()) {
+ return s;
+ }
+ }
+#ifndef ROCKSDB_LITE
+ // Currently we only support roll by time-to-roll and log size
+ if (options.log_file_time_to_roll > 0 || options.max_log_file_size > 0) {
+ AutoRollLogger* result = new AutoRollLogger(
+ env->GetFileSystem(), clock, dbname, options.db_log_dir,
+ options.max_log_file_size, options.log_file_time_to_roll,
+ options.keep_log_file_num, options.info_log_level);
+ s = result->GetStatus();
+ if (!s.ok()) {
+ delete result;
+ } else {
+ logger->reset(result);
+ }
+ return s;
+ }
+#endif // !ROCKSDB_LITE
+ // Open a log file in the same directory as the db
+ s = env->FileExists(fname);
+ if (s.ok()) {
+ s = env->RenameFile(
+ fname, OldInfoLogFileName(dbname, clock->NowMicros(), db_absolute_path,
+ options.db_log_dir));
+
+ // The operation sequence of "FileExists -> Rename" is not atomic. It's
+ // possible that FileExists returns OK but file gets deleted before Rename.
+ // This can cause Rename to return IOError with subcode PathNotFound.
+ // Although it may be a rare case and applications should be discouraged
+ // to not concurrently modifying the contents of the directories accessed
+ // by the database instance, it is still helpful if we can perform some
+ // simple handling of this case. Therefore, we do the following:
+ // 1. if Rename() returns IOError with PathNotFound subcode, then we check
+ // whether the source file, i.e. LOG, exists.
+ // 2. if LOG exists, it means Rename() failed due to something else. Then
+ // we report error.
+ // 3. if LOG does not exist, it means it may have been removed/renamed by
+ // someone else. Since it does not exist, we can reset Status to OK so
+ // that this caller can try creating a new LOG file. If this succeeds,
+ // we should still allow it.
+ if (s.IsPathNotFound()) {
+ s = env->FileExists(fname);
+ if (s.IsNotFound()) {
+ s = Status::OK();
+ }
+ }
+ } else if (s.IsNotFound()) {
+ // "LOG" is not required to exist since this could be a new DB.
+ s = Status::OK();
+ }
+ if (s.ok()) {
+ s = env->NewLogger(fname, logger);
+ }
+ if (s.ok() && logger->get() != nullptr) {
+ (*logger)->SetInfoLogLevel(options.info_log_level);
+ }
+ return s;
+}
+
+} // namespace ROCKSDB_NAMESPACE