diff options
Diffstat (limited to 'src/rocksdb/logging/auto_roll_logger.cc')
-rw-r--r-- | src/rocksdb/logging/auto_roll_logger.cc | 372 |
1 files changed, 372 insertions, 0 deletions
diff --git a/src/rocksdb/logging/auto_roll_logger.cc b/src/rocksdb/logging/auto_roll_logger.cc new file mode 100644 index 000000000..fe0958479 --- /dev/null +++ b/src/rocksdb/logging/auto_roll_logger.cc @@ -0,0 +1,372 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "logging/auto_roll_logger.h" + +#include <algorithm> + +#include "file/filename.h" +#include "logging/logging.h" +#include "rocksdb/env.h" +#include "rocksdb/file_system.h" +#include "rocksdb/system_clock.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +// -- AutoRollLogger + +AutoRollLogger::AutoRollLogger(const std::shared_ptr<FileSystem>& fs, + const std::shared_ptr<SystemClock>& clock, + const std::string& dbname, + const std::string& db_log_dir, + size_t log_max_size, + size_t log_file_time_to_roll, + size_t keep_log_file_num, + const InfoLogLevel log_level) + : Logger(log_level), + dbname_(dbname), + db_log_dir_(db_log_dir), + fs_(fs), + clock_(clock), + status_(Status::OK()), + kMaxLogFileSize(log_max_size), + kLogFileTimeToRoll(log_file_time_to_roll), + kKeepLogFileNum(keep_log_file_num), + cached_now(static_cast<uint64_t>(clock_->NowMicros() * 1e-6)), + ctime_(cached_now), + cached_now_access_count(0), + call_NowMicros_every_N_records_(100), + mutex_() { + Status s = fs->GetAbsolutePath(dbname, io_options_, &db_absolute_path_, + &io_context_); + if (s.IsNotSupported()) { + db_absolute_path_ = dbname; + } else { + status_ = s; + } + log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_); + if (fs_->FileExists(log_fname_, io_options_, &io_context_).ok()) { + RollLogFile(); + } + GetExistingFiles(); + s = ResetLogger(); + if (s.ok() && status_.ok()) { + status_ = TrimOldLogFiles(); + } +} + +Status AutoRollLogger::ResetLogger() { + TEST_SYNC_POINT("AutoRollLogger::ResetLogger:BeforeNewLogger"); + status_ = fs_->NewLogger(log_fname_, io_options_, &logger_, &io_context_); + TEST_SYNC_POINT("AutoRollLogger::ResetLogger:AfterNewLogger"); + + if (!status_.ok()) { + return status_; + } + assert(logger_); + logger_->SetInfoLogLevel(Logger::GetInfoLogLevel()); + + if (logger_->GetLogFileSize() == Logger::kDoNotSupportGetLogFileSize) { + status_ = Status::NotSupported( + "The underlying logger doesn't support GetLogFileSize()"); + } + if (status_.ok()) { + cached_now = static_cast<uint64_t>(clock_->NowMicros() * 1e-6); + ctime_ = cached_now; + cached_now_access_count = 0; + } + + return status_; +} + +void AutoRollLogger::RollLogFile() { + // This function is called when log is rotating. Two rotations + // can happen quickly (NowMicro returns same value). To not overwrite + // previous log file we increment by one micro second and try again. + uint64_t now = clock_->NowMicros(); + std::string old_fname; + do { + old_fname = + OldInfoLogFileName(dbname_, now, db_absolute_path_, db_log_dir_); + now++; + } while (fs_->FileExists(old_fname, io_options_, &io_context_).ok()); + // Wait for logger_ reference count to turn to 1 as it might be pinned by + // Flush. Pinned Logger can't be closed till Flush is completed on that + // Logger. + while (logger_.use_count() > 1) { + } + // Close the existing logger first to release the existing handle + // before renaming the file using the file system. If this call + // fails there is nothing much we can do and we will continue with the + // rename and hence ignoring the result status. + if (logger_) { + logger_->Close().PermitUncheckedError(); + } + Status s = fs_->RenameFile(log_fname_, old_fname, io_options_, &io_context_); + if (!s.ok()) { + // What should we do on error? + } + old_log_files_.push(old_fname); +} + +void AutoRollLogger::GetExistingFiles() { + { + // Empty the queue to avoid duplicated entries in the queue. + std::queue<std::string> empty; + std::swap(old_log_files_, empty); + } + + std::string parent_dir; + std::vector<std::string> info_log_files; + Status s = + GetInfoLogFiles(fs_, db_log_dir_, dbname_, &parent_dir, &info_log_files); + if (status_.ok()) { + status_ = s; + } + // We need to sort the file before enqueing it so that when we + // delete file from the front, it is the oldest file. + std::sort(info_log_files.begin(), info_log_files.end()); + + for (const std::string& f : info_log_files) { + old_log_files_.push(parent_dir + "/" + f); + } +} + +Status AutoRollLogger::TrimOldLogFiles() { + // Here we directly list info files and delete them through FileSystem. + // The deletion isn't going through DB, so there are shortcomes: + // 1. the deletion is not rate limited by SstFileManager + // 2. there is a chance that an I/O will be issued here + // Since it's going to be complicated to pass DB object down to + // here, we take a simple approach to keep the code easier to + // maintain. + + // old_log_files_.empty() is helpful for the corner case that + // kKeepLogFileNum == 0. We can instead check kKeepLogFileNum != 0 but + // it's essentially the same thing, and checking empty before accessing + // the queue feels safer. + while (!old_log_files_.empty() && old_log_files_.size() >= kKeepLogFileNum) { + Status s = + fs_->DeleteFile(old_log_files_.front(), io_options_, &io_context_); + // Remove the file from the tracking anyway. It's possible that + // DB cleaned up the old log file, or people cleaned it up manually. + old_log_files_.pop(); + // To make the file really go away, we should sync parent directory. + // Since there isn't any consistency issue involved here, skipping + // this part to avoid one I/O here. + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + +std::string AutoRollLogger::ValistToString(const char* format, + va_list args) const { + // Any log messages longer than 1024 will get truncated. + // The user is responsible for chopping longer messages into multi line log + static const int MAXBUFFERSIZE = 1024; + char buffer[MAXBUFFERSIZE]; + + int count = vsnprintf(buffer, MAXBUFFERSIZE, format, args); + (void)count; + assert(count >= 0); + + return buffer; +} + +void AutoRollLogger::LogInternal(const char* format, ...) { + mutex_.AssertHeld(); + + if (!logger_) { + return; + } + + va_list args; + va_start(args, format); + logger_->Logv(format, args); + va_end(args); +} + +void AutoRollLogger::Logv(const char* format, va_list ap) { + assert(GetStatus().ok()); + if (!logger_) { + return; + } + + std::shared_ptr<Logger> logger; + { + MutexLock l(&mutex_); + if ((kLogFileTimeToRoll > 0 && LogExpired()) || + (kMaxLogFileSize > 0 && logger_->GetLogFileSize() >= kMaxLogFileSize)) { + RollLogFile(); + Status s = ResetLogger(); + Status s2 = TrimOldLogFiles(); + + if (!s.ok()) { + // can't really log the error if creating a new LOG file failed + return; + } + + WriteHeaderInfo(); + + if (!s2.ok()) { + ROCKS_LOG_WARN(logger.get(), "Fail to trim old info log file: %s", + s2.ToString().c_str()); + } + } + + // pin down the current logger_ instance before releasing the mutex. + logger = logger_; + } + + // Another thread could have put a new Logger instance into logger_ by now. + // However, since logger is still hanging on to the previous instance + // (reference count is not zero), we don't have to worry about it being + // deleted while we are accessing it. + // Note that logv itself is not mutex protected to allow maximum concurrency, + // as thread safety should have been handled by the underlying logger. + logger->Logv(format, ap); +} + +void AutoRollLogger::WriteHeaderInfo() { + mutex_.AssertHeld(); + for (auto& header : headers_) { + LogInternal("%s", header.c_str()); + } +} + +void AutoRollLogger::LogHeader(const char* format, va_list args) { + if (!logger_) { + return; + } + + // header message are to be retained in memory. Since we cannot make any + // assumptions about the data contained in va_list, we will retain them as + // strings + va_list tmp; + va_copy(tmp, args); + std::string data = ValistToString(format, tmp); + va_end(tmp); + + MutexLock l(&mutex_); + headers_.push_back(data); + + // Log the original message to the current log + logger_->Logv(format, args); +} + +bool AutoRollLogger::LogExpired() { + if (cached_now_access_count >= call_NowMicros_every_N_records_) { + cached_now = static_cast<uint64_t>(clock_->NowMicros() * 1e-6); + cached_now_access_count = 0; + } + + ++cached_now_access_count; + return cached_now >= ctime_ + kLogFileTimeToRoll; +} +#endif // !ROCKSDB_LITE + +Status CreateLoggerFromOptions(const std::string& dbname, + const DBOptions& options, + std::shared_ptr<Logger>* logger) { + if (options.info_log) { + *logger = options.info_log; + return Status::OK(); + } + + Env* env = options.env; + std::string db_absolute_path; + Status s = env->GetAbsolutePath(dbname, &db_absolute_path); + TEST_SYNC_POINT_CALLBACK("rocksdb::CreateLoggerFromOptions:AfterGetPath", &s); + if (!s.ok()) { + return s; + } + std::string fname = + InfoLogFileName(dbname, db_absolute_path, options.db_log_dir); + + const auto& clock = env->GetSystemClock(); + // In case it does not exist. + s = env->CreateDirIfMissing(dbname); + if (!s.ok()) { + if (options.db_log_dir.empty()) { + return s; + } else { + // Ignore the error returned during creation of dbname because dbname and + // db_log_dir can be on different filesystems in which case dbname will + // not exist and error should be ignored. db_log_dir creation will handle + // the error in case there is any error in the creation of dbname on same + // filesystem. + s = Status::OK(); + } + } + assert(s.ok()); + + if (!options.db_log_dir.empty()) { + s = env->CreateDirIfMissing(options.db_log_dir); + if (!s.ok()) { + return s; + } + } +#ifndef ROCKSDB_LITE + // Currently we only support roll by time-to-roll and log size + if (options.log_file_time_to_roll > 0 || options.max_log_file_size > 0) { + AutoRollLogger* result = new AutoRollLogger( + env->GetFileSystem(), clock, dbname, options.db_log_dir, + options.max_log_file_size, options.log_file_time_to_roll, + options.keep_log_file_num, options.info_log_level); + s = result->GetStatus(); + if (!s.ok()) { + delete result; + } else { + logger->reset(result); + } + return s; + } +#endif // !ROCKSDB_LITE + // Open a log file in the same directory as the db + s = env->FileExists(fname); + if (s.ok()) { + s = env->RenameFile( + fname, OldInfoLogFileName(dbname, clock->NowMicros(), db_absolute_path, + options.db_log_dir)); + + // The operation sequence of "FileExists -> Rename" is not atomic. It's + // possible that FileExists returns OK but file gets deleted before Rename. + // This can cause Rename to return IOError with subcode PathNotFound. + // Although it may be a rare case and applications should be discouraged + // to not concurrently modifying the contents of the directories accessed + // by the database instance, it is still helpful if we can perform some + // simple handling of this case. Therefore, we do the following: + // 1. if Rename() returns IOError with PathNotFound subcode, then we check + // whether the source file, i.e. LOG, exists. + // 2. if LOG exists, it means Rename() failed due to something else. Then + // we report error. + // 3. if LOG does not exist, it means it may have been removed/renamed by + // someone else. Since it does not exist, we can reset Status to OK so + // that this caller can try creating a new LOG file. If this succeeds, + // we should still allow it. + if (s.IsPathNotFound()) { + s = env->FileExists(fname); + if (s.IsNotFound()) { + s = Status::OK(); + } + } + } else if (s.IsNotFound()) { + // "LOG" is not required to exist since this could be a new DB. + s = Status::OK(); + } + if (s.ok()) { + s = env->NewLogger(fname, logger); + } + if (s.ok() && logger->get() != nullptr) { + (*logger)->SetInfoLogLevel(options.info_log_level); + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE |