diff options
Diffstat (limited to 'src/rocksdb/logging')
-rw-r--r-- | src/rocksdb/logging/auto_roll_logger.cc | 292 | ||||
-rw-r--r-- | src/rocksdb/logging/auto_roll_logger.h | 164 | ||||
-rw-r--r-- | src/rocksdb/logging/auto_roll_logger_test.cc | 685 | ||||
-rw-r--r-- | src/rocksdb/logging/env_logger.h | 165 | ||||
-rw-r--r-- | src/rocksdb/logging/env_logger_test.cc | 162 | ||||
-rw-r--r-- | src/rocksdb/logging/event_logger.cc | 71 | ||||
-rw-r--r-- | src/rocksdb/logging/event_logger.h | 203 | ||||
-rw-r--r-- | src/rocksdb/logging/event_logger_test.cc | 43 | ||||
-rw-r--r-- | src/rocksdb/logging/log_buffer.cc | 92 | ||||
-rw-r--r-- | src/rocksdb/logging/log_buffer.h | 56 | ||||
-rw-r--r-- | src/rocksdb/logging/logging.h | 68 | ||||
-rw-r--r-- | src/rocksdb/logging/posix_logger.h | 185 |
12 files changed, 2186 insertions, 0 deletions
diff --git a/src/rocksdb/logging/auto_roll_logger.cc b/src/rocksdb/logging/auto_roll_logger.cc new file mode 100644 index 000000000..fc498521b --- /dev/null +++ b/src/rocksdb/logging/auto_roll_logger.cc @@ -0,0 +1,292 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +#include "logging/auto_roll_logger.h" + +#include <algorithm> +#include "file/filename.h" +#include "logging/logging.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +// -- AutoRollLogger + +AutoRollLogger::AutoRollLogger(Env* env, const std::string& dbname, + const std::string& db_log_dir, + size_t log_max_size, + size_t log_file_time_to_roll, + size_t keep_log_file_num, + const InfoLogLevel log_level) + : Logger(log_level), + dbname_(dbname), + db_log_dir_(db_log_dir), + env_(env), + status_(Status::OK()), + kMaxLogFileSize(log_max_size), + kLogFileTimeToRoll(log_file_time_to_roll), + kKeepLogFileNum(keep_log_file_num), + cached_now(static_cast<uint64_t>(env_->NowMicros() * 1e-6)), + ctime_(cached_now), + cached_now_access_count(0), + call_NowMicros_every_N_records_(100), + mutex_() { + Status s = env->GetAbsolutePath(dbname, &db_absolute_path_); + if (s.IsNotSupported()) { + db_absolute_path_ = dbname; + } else { + status_ = s; + } + log_fname_ = InfoLogFileName(dbname_, db_absolute_path_, db_log_dir_); + if (env_->FileExists(log_fname_).ok()) { + RollLogFile(); + } + GetExistingFiles(); + ResetLogger(); + if (status_.ok()) { + status_ = TrimOldLogFiles(); + } +} + +Status AutoRollLogger::ResetLogger() { + TEST_SYNC_POINT("AutoRollLogger::ResetLogger:BeforeNewLogger"); + status_ = env_->NewLogger(log_fname_, &logger_); + TEST_SYNC_POINT("AutoRollLogger::ResetLogger:AfterNewLogger"); + + if (!status_.ok()) { + return status_; + } + assert(logger_); + logger_->SetInfoLogLevel(Logger::GetInfoLogLevel()); + + if (logger_->GetLogFileSize() == Logger::kDoNotSupportGetLogFileSize) { + status_ = Status::NotSupported( + "The underlying logger doesn't support GetLogFileSize()"); + } + if (status_.ok()) { + cached_now = static_cast<uint64_t>(env_->NowMicros() * 1e-6); + ctime_ = cached_now; + cached_now_access_count = 0; + } + + return status_; +} + +void AutoRollLogger::RollLogFile() { + // This function is called when log is rotating. Two rotations + // can happen quickly (NowMicro returns same value). To not overwrite + // previous log file we increment by one micro second and try again. + uint64_t now = env_->NowMicros(); + std::string old_fname; + do { + old_fname = OldInfoLogFileName( + dbname_, now, db_absolute_path_, db_log_dir_); + now++; + } while (env_->FileExists(old_fname).ok()); + env_->RenameFile(log_fname_, old_fname); + old_log_files_.push(old_fname); +} + +void AutoRollLogger::GetExistingFiles() { + { + // Empty the queue to avoid duplicated entries in the queue. + std::queue<std::string> empty; + std::swap(old_log_files_, empty); + } + + std::string parent_dir; + std::vector<std::string> info_log_files; + Status s = + GetInfoLogFiles(env_, db_log_dir_, dbname_, &parent_dir, &info_log_files); + if (status_.ok()) { + status_ = s; + } + // We need to sort the file before enqueing it so that when we + // delete file from the front, it is the oldest file. + std::sort(info_log_files.begin(), info_log_files.end()); + + for (const std::string& f : info_log_files) { + old_log_files_.push(parent_dir + "/" + f); + } +} + +Status AutoRollLogger::TrimOldLogFiles() { + // Here we directly list info files and delete them through Env. + // The deletion isn't going through DB, so there are shortcomes: + // 1. the deletion is not rate limited by SstFileManager + // 2. there is a chance that an I/O will be issued here + // Since it's going to be complicated to pass DB object down to + // here, we take a simple approach to keep the code easier to + // maintain. + + // old_log_files_.empty() is helpful for the corner case that + // kKeepLogFileNum == 0. We can instead check kKeepLogFileNum != 0 but + // it's essentially the same thing, and checking empty before accessing + // the queue feels safer. + while (!old_log_files_.empty() && old_log_files_.size() >= kKeepLogFileNum) { + Status s = env_->DeleteFile(old_log_files_.front()); + // Remove the file from the tracking anyway. It's possible that + // DB cleaned up the old log file, or people cleaned it up manually. + old_log_files_.pop(); + // To make the file really go away, we should sync parent directory. + // Since there isn't any consistency issue involved here, skipping + // this part to avoid one I/O here. + if (!s.ok()) { + return s; + } + } + return Status::OK(); +} + +std::string AutoRollLogger::ValistToString(const char* format, + va_list args) const { + // Any log messages longer than 1024 will get truncated. + // The user is responsible for chopping longer messages into multi line log + static const int MAXBUFFERSIZE = 1024; + char buffer[MAXBUFFERSIZE]; + + int count = vsnprintf(buffer, MAXBUFFERSIZE, format, args); + (void) count; + assert(count >= 0); + + return buffer; +} + +void AutoRollLogger::LogInternal(const char* format, ...) { + mutex_.AssertHeld(); + + if (!logger_) { + return; + } + + va_list args; + va_start(args, format); + logger_->Logv(format, args); + va_end(args); +} + +void AutoRollLogger::Logv(const char* format, va_list ap) { + assert(GetStatus().ok()); + if (!logger_) { + return; + } + + std::shared_ptr<Logger> logger; + { + MutexLock l(&mutex_); + if ((kLogFileTimeToRoll > 0 && LogExpired()) || + (kMaxLogFileSize > 0 && logger_->GetLogFileSize() >= kMaxLogFileSize)) { + RollLogFile(); + Status s = ResetLogger(); + Status s2 = TrimOldLogFiles(); + + if (!s.ok()) { + // can't really log the error if creating a new LOG file failed + return; + } + + WriteHeaderInfo(); + + if (!s2.ok()) { + ROCKS_LOG_WARN(logger.get(), "Fail to trim old info log file: %s", + s2.ToString().c_str()); + } + } + + // pin down the current logger_ instance before releasing the mutex. + logger = logger_; + } + + // Another thread could have put a new Logger instance into logger_ by now. + // However, since logger is still hanging on to the previous instance + // (reference count is not zero), we don't have to worry about it being + // deleted while we are accessing it. + // Note that logv itself is not mutex protected to allow maximum concurrency, + // as thread safety should have been handled by the underlying logger. + logger->Logv(format, ap); +} + +void AutoRollLogger::WriteHeaderInfo() { + mutex_.AssertHeld(); + for (auto& header : headers_) { + LogInternal("%s", header.c_str()); + } +} + +void AutoRollLogger::LogHeader(const char* format, va_list args) { + if (!logger_) { + return; + } + + // header message are to be retained in memory. Since we cannot make any + // assumptions about the data contained in va_list, we will retain them as + // strings + va_list tmp; + va_copy(tmp, args); + std::string data = ValistToString(format, tmp); + va_end(tmp); + + MutexLock l(&mutex_); + headers_.push_back(data); + + // Log the original message to the current log + logger_->Logv(format, args); +} + +bool AutoRollLogger::LogExpired() { + if (cached_now_access_count >= call_NowMicros_every_N_records_) { + cached_now = static_cast<uint64_t>(env_->NowMicros() * 1e-6); + cached_now_access_count = 0; + } + + ++cached_now_access_count; + return cached_now >= ctime_ + kLogFileTimeToRoll; +} +#endif // !ROCKSDB_LITE + +Status CreateLoggerFromOptions(const std::string& dbname, + const DBOptions& options, + std::shared_ptr<Logger>* logger) { + if (options.info_log) { + *logger = options.info_log; + return Status::OK(); + } + + Env* env = options.env; + std::string db_absolute_path; + env->GetAbsolutePath(dbname, &db_absolute_path); + std::string fname = + InfoLogFileName(dbname, db_absolute_path, options.db_log_dir); + + env->CreateDirIfMissing(dbname); // In case it does not exist + // Currently we only support roll by time-to-roll and log size +#ifndef ROCKSDB_LITE + if (options.log_file_time_to_roll > 0 || options.max_log_file_size > 0) { + AutoRollLogger* result = new AutoRollLogger( + env, dbname, options.db_log_dir, options.max_log_file_size, + options.log_file_time_to_roll, options.keep_log_file_num, + options.info_log_level); + Status s = result->GetStatus(); + if (!s.ok()) { + delete result; + } else { + logger->reset(result); + } + return s; + } +#endif // !ROCKSDB_LITE + // Open a log file in the same directory as the db + env->RenameFile(fname, + OldInfoLogFileName(dbname, env->NowMicros(), db_absolute_path, + options.db_log_dir)); + auto s = env->NewLogger(fname, logger); + if (logger->get() != nullptr) { + (*logger)->SetInfoLogLevel(options.info_log_level); + } + return s; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/auto_roll_logger.h b/src/rocksdb/logging/auto_roll_logger.h new file mode 100644 index 000000000..5d8c41413 --- /dev/null +++ b/src/rocksdb/logging/auto_roll_logger.h @@ -0,0 +1,164 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#pragma once +#include <list> +#include <queue> +#include <string> + +#include "file/filename.h" +#include "port/port.h" +#include "port/util_logger.h" +#include "test_util/sync_point.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +#ifndef ROCKSDB_LITE +// Rolls the log file by size and/or time +class AutoRollLogger : public Logger { + public: + AutoRollLogger(Env* env, const std::string& dbname, + const std::string& db_log_dir, size_t log_max_size, + size_t log_file_time_to_roll, size_t keep_log_file_num, + const InfoLogLevel log_level = InfoLogLevel::INFO_LEVEL); + + using Logger::Logv; + void Logv(const char* format, va_list ap) override; + + // Write a header entry to the log. All header information will be written + // again every time the log rolls over. + virtual void LogHeader(const char* format, va_list ap) override; + + // check if the logger has encountered any problem. + Status GetStatus() { + return status_; + } + + size_t GetLogFileSize() const override { + if (!logger_) { + return 0; + } + + std::shared_ptr<Logger> logger; + { + MutexLock l(&mutex_); + // pin down the current logger_ instance before releasing the mutex. + logger = logger_; + } + return logger->GetLogFileSize(); + } + + void Flush() override { + std::shared_ptr<Logger> logger; + { + MutexLock l(&mutex_); + // pin down the current logger_ instance before releasing the mutex. + logger = logger_; + } + TEST_SYNC_POINT("AutoRollLogger::Flush:PinnedLogger"); + if (logger) { + logger->Flush(); + } + } + + virtual ~AutoRollLogger() { + if (logger_ && !closed_) { + logger_->Close(); + } + } + + using Logger::GetInfoLogLevel; + InfoLogLevel GetInfoLogLevel() const override { + MutexLock l(&mutex_); + if (!logger_) { + return Logger::GetInfoLogLevel(); + } + return logger_->GetInfoLogLevel(); + } + + using Logger::SetInfoLogLevel; + void SetInfoLogLevel(const InfoLogLevel log_level) override { + MutexLock lock(&mutex_); + Logger::SetInfoLogLevel(log_level); + if (logger_) { + logger_->SetInfoLogLevel(log_level); + } + } + + void SetCallNowMicrosEveryNRecords(uint64_t call_NowMicros_every_N_records) { + call_NowMicros_every_N_records_ = call_NowMicros_every_N_records; + } + + // Expose the log file path for testing purpose + std::string TEST_log_fname() const { + return log_fname_; + } + + uint64_t TEST_ctime() const { return ctime_; } + + Logger* TEST_inner_logger() const { return logger_.get(); } + + protected: + // Implementation of Close() + virtual Status CloseImpl() override { + if (logger_) { + return logger_->Close(); + } else { + return Status::OK(); + } + } + + private: + bool LogExpired(); + Status ResetLogger(); + void RollLogFile(); + // Read all names of old log files into old_log_files_ + // If there is any error, put the error code in status_ + void GetExistingFiles(); + // Delete old log files if it excceeds the limit. + Status TrimOldLogFiles(); + // Log message to logger without rolling + void LogInternal(const char* format, ...); + // Serialize the va_list to a string + std::string ValistToString(const char* format, va_list args) const; + // Write the logs marked as headers to the new log file + void WriteHeaderInfo(); + std::string log_fname_; // Current active info log's file name. + std::string dbname_; + std::string db_log_dir_; + std::string db_absolute_path_; + Env* env_; + std::shared_ptr<Logger> logger_; + // current status of the logger + Status status_; + const size_t kMaxLogFileSize; + const size_t kLogFileTimeToRoll; + const size_t kKeepLogFileNum; + // header information + std::list<std::string> headers_; + // List of all existing info log files. Used for enforcing number of + // info log files. + // Full path is stored here. It consumes signifianctly more memory + // than only storing file name. Can optimize if it causes a problem. + std::queue<std::string> old_log_files_; + // to avoid frequent env->NowMicros() calls, we cached the current time + uint64_t cached_now; + uint64_t ctime_; + uint64_t cached_now_access_count; + uint64_t call_NowMicros_every_N_records_; + mutable port::Mutex mutex_; +}; +#endif // !ROCKSDB_LITE + +// Facade to craete logger automatically +Status CreateLoggerFromOptions(const std::string& dbname, + const DBOptions& options, + std::shared_ptr<Logger>* logger); + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/auto_roll_logger_test.cc b/src/rocksdb/logging/auto_roll_logger_test.cc new file mode 100644 index 000000000..6e72fb90b --- /dev/null +++ b/src/rocksdb/logging/auto_roll_logger_test.cc @@ -0,0 +1,685 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#ifndef ROCKSDB_LITE + +#include "logging/auto_roll_logger.h" +#include <errno.h> +#include <sys/stat.h> +#include <algorithm> +#include <cmath> +#include <fstream> +#include <iostream> +#include <iterator> +#include <string> +#include <thread> +#include <vector> +#include "logging/logging.h" +#include "port/port.h" +#include "rocksdb/db.h" +#include "test_util/sync_point.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { +namespace { +class NoSleepEnv : public EnvWrapper { + public: + NoSleepEnv(Env* base) : EnvWrapper(base) {} + void SleepForMicroseconds(int micros) override { + fake_time_ += static_cast<uint64_t>(micros); + } + + uint64_t NowNanos() override { return fake_time_ * 1000; } + + uint64_t NowMicros() override { return fake_time_; } + + private: + uint64_t fake_time_ = 6666666666; +}; +} // namespace + +// In this test we only want to Log some simple log message with +// no format. LogMessage() provides such a simple interface and +// avoids the [format-security] warning which occurs when you +// call ROCKS_LOG_INFO(logger, log_message) directly. +namespace { +void LogMessage(Logger* logger, const char* message) { + ROCKS_LOG_INFO(logger, "%s", message); +} + +void LogMessage(const InfoLogLevel log_level, Logger* logger, + const char* message) { + Log(log_level, logger, "%s", message); +} +} // namespace + +class AutoRollLoggerTest : public testing::Test { + public: + static void InitTestDb() { +#ifdef OS_WIN + // Replace all slashes in the path so windows CompSpec does not + // become confused + std::string testDir(kTestDir); + std::replace_if(testDir.begin(), testDir.end(), + [](char ch) { return ch == '/'; }, '\\'); + std::string deleteCmd = "if exist " + testDir + " rd /s /q " + testDir; +#else + std::string deleteCmd = "rm -rf " + kTestDir; +#endif + ASSERT_TRUE(system(deleteCmd.c_str()) == 0); + Env::Default()->CreateDir(kTestDir); + } + + void RollLogFileBySizeTest(AutoRollLogger* logger, size_t log_max_size, + const std::string& log_message); + void RollLogFileByTimeTest(Env*, AutoRollLogger* logger, size_t time, + const std::string& log_message); + // return list of files under kTestDir that contains "LOG" + std::vector<std::string> GetLogFiles() { + std::vector<std::string> ret; + std::vector<std::string> files; + Status s = default_env->GetChildren(kTestDir, &files); + // Should call ASSERT_OK() here but it doesn't compile. It's not + // worth the time figuring out why. + EXPECT_TRUE(s.ok()); + for (const auto& f : files) { + if (f.find("LOG") != std::string::npos) { + ret.push_back(f); + } + } + return ret; + } + + // Delete all log files under kTestDir + void CleanupLogFiles() { + for (const std::string& f : GetLogFiles()) { + ASSERT_OK(default_env->DeleteFile(kTestDir + "/" + f)); + } + } + + void RollNTimesBySize(Logger* auto_roll_logger, size_t file_num, + size_t max_log_file_size) { + // Roll the log 4 times, and it will trim to 3 files. + std::string dummy_large_string; + dummy_large_string.assign(max_log_file_size, '='); + auto_roll_logger->SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + for (size_t i = 0; i < file_num + 1; i++) { + // Log enough bytes to trigger at least one roll. + LogMessage(auto_roll_logger, dummy_large_string.c_str()); + LogMessage(auto_roll_logger, ""); + } + } + + static const std::string kSampleMessage; + static const std::string kTestDir; + static const std::string kLogFile; + static Env* default_env; +}; + +const std::string AutoRollLoggerTest::kSampleMessage( + "this is the message to be written to the log file!!"); +const std::string AutoRollLoggerTest::kTestDir( + test::PerThreadDBPath("db_log_test")); +const std::string AutoRollLoggerTest::kLogFile( + test::PerThreadDBPath("db_log_test") + "/LOG"); +Env* AutoRollLoggerTest::default_env = Env::Default(); + +void AutoRollLoggerTest::RollLogFileBySizeTest(AutoRollLogger* logger, + size_t log_max_size, + const std::string& log_message) { + logger->SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + ASSERT_EQ(InfoLogLevel::INFO_LEVEL, logger->GetInfoLogLevel()); + ASSERT_EQ(InfoLogLevel::INFO_LEVEL, + logger->TEST_inner_logger()->GetInfoLogLevel()); + // measure the size of each message, which is supposed + // to be equal or greater than log_message.size() + LogMessage(logger, log_message.c_str()); + size_t message_size = logger->GetLogFileSize(); + size_t current_log_size = message_size; + + // Test the cases when the log file will not be rolled. + while (current_log_size + message_size < log_max_size) { + LogMessage(logger, log_message.c_str()); + current_log_size += message_size; + ASSERT_EQ(current_log_size, logger->GetLogFileSize()); + } + + // Now the log file will be rolled + LogMessage(logger, log_message.c_str()); + // Since rotation is checked before actual logging, we need to + // trigger the rotation by logging another message. + LogMessage(logger, log_message.c_str()); + + ASSERT_TRUE(message_size == logger->GetLogFileSize()); +} + +void AutoRollLoggerTest::RollLogFileByTimeTest(Env* env, AutoRollLogger* logger, + size_t time, + const std::string& log_message) { + uint64_t expected_ctime; + uint64_t actual_ctime; + + uint64_t total_log_size; + EXPECT_OK(env->GetFileSize(kLogFile, &total_log_size)); + expected_ctime = logger->TEST_ctime(); + logger->SetCallNowMicrosEveryNRecords(0); + + // -- Write to the log for several times, which is supposed + // to be finished before time. + for (int i = 0; i < 10; ++i) { + env->SleepForMicroseconds(50000); + LogMessage(logger, log_message.c_str()); + EXPECT_OK(logger->GetStatus()); + // Make sure we always write to the same log file (by + // checking the create time); + + actual_ctime = logger->TEST_ctime(); + + // Also make sure the log size is increasing. + EXPECT_EQ(expected_ctime, actual_ctime); + EXPECT_GT(logger->GetLogFileSize(), total_log_size); + total_log_size = logger->GetLogFileSize(); + } + + // -- Make the log file expire + env->SleepForMicroseconds(static_cast<int>(time * 1000000)); + LogMessage(logger, log_message.c_str()); + + // At this time, the new log file should be created. + actual_ctime = logger->TEST_ctime(); + EXPECT_LT(expected_ctime, actual_ctime); + EXPECT_LT(logger->GetLogFileSize(), total_log_size); +} + +TEST_F(AutoRollLoggerTest, RollLogFileBySize) { + InitTestDb(); + size_t log_max_size = 1024 * 5; + size_t keep_log_file_num = 10; + + AutoRollLogger logger(Env::Default(), kTestDir, "", log_max_size, 0, + keep_log_file_num); + + RollLogFileBySizeTest(&logger, log_max_size, + kSampleMessage + ":RollLogFileBySize"); +} + +TEST_F(AutoRollLoggerTest, RollLogFileByTime) { + NoSleepEnv nse(Env::Default()); + + size_t time = 2; + size_t log_size = 1024 * 5; + size_t keep_log_file_num = 10; + + InitTestDb(); + // -- Test the existence of file during the server restart. + ASSERT_EQ(Status::NotFound(), default_env->FileExists(kLogFile)); + AutoRollLogger logger(&nse, kTestDir, "", log_size, time, keep_log_file_num); + ASSERT_OK(default_env->FileExists(kLogFile)); + + RollLogFileByTimeTest(&nse, &logger, time, + kSampleMessage + ":RollLogFileByTime"); +} + +TEST_F(AutoRollLoggerTest, SetInfoLogLevel) { + InitTestDb(); + Options options; + options.info_log_level = InfoLogLevel::FATAL_LEVEL; + options.max_log_file_size = 1024; + std::shared_ptr<Logger> logger; + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + auto* auto_roll_logger = dynamic_cast<AutoRollLogger*>(logger.get()); + ASSERT_NE(nullptr, auto_roll_logger); + ASSERT_EQ(InfoLogLevel::FATAL_LEVEL, auto_roll_logger->GetInfoLogLevel()); + ASSERT_EQ(InfoLogLevel::FATAL_LEVEL, + auto_roll_logger->TEST_inner_logger()->GetInfoLogLevel()); + auto_roll_logger->SetInfoLogLevel(InfoLogLevel::DEBUG_LEVEL); + ASSERT_EQ(InfoLogLevel::DEBUG_LEVEL, auto_roll_logger->GetInfoLogLevel()); + ASSERT_EQ(InfoLogLevel::DEBUG_LEVEL, logger->GetInfoLogLevel()); + ASSERT_EQ(InfoLogLevel::DEBUG_LEVEL, + auto_roll_logger->TEST_inner_logger()->GetInfoLogLevel()); +} + +TEST_F(AutoRollLoggerTest, OpenLogFilesMultipleTimesWithOptionLog_max_size) { + // If only 'log_max_size' options is specified, then every time + // when rocksdb is restarted, a new empty log file will be created. + InitTestDb(); + // WORKAROUND: + // avoid complier's complaint of "comparison between signed + // and unsigned integer expressions" because literal 0 is + // treated as "singed". + size_t kZero = 0; + size_t log_size = 1024; + size_t keep_log_file_num = 10; + + AutoRollLogger* logger = new AutoRollLogger(Env::Default(), kTestDir, "", + log_size, 0, keep_log_file_num); + + LogMessage(logger, kSampleMessage.c_str()); + ASSERT_GT(logger->GetLogFileSize(), kZero); + delete logger; + + // reopens the log file and an empty log file will be created. + logger = new AutoRollLogger(Env::Default(), kTestDir, "", log_size, 0, 10); + ASSERT_EQ(logger->GetLogFileSize(), kZero); + delete logger; +} + +TEST_F(AutoRollLoggerTest, CompositeRollByTimeAndSizeLogger) { + size_t time = 2, log_max_size = 1024 * 5; + size_t keep_log_file_num = 10; + + InitTestDb(); + + NoSleepEnv nse(Env::Default()); + AutoRollLogger logger(&nse, kTestDir, "", log_max_size, time, + keep_log_file_num); + + // Test the ability to roll by size + RollLogFileBySizeTest(&logger, log_max_size, + kSampleMessage + ":CompositeRollByTimeAndSizeLogger"); + + // Test the ability to roll by Time + RollLogFileByTimeTest(&nse, &logger, time, + kSampleMessage + ":CompositeRollByTimeAndSizeLogger"); +} + +#ifndef OS_WIN +// TODO: does not build for Windows because of PosixLogger use below. Need to +// port +TEST_F(AutoRollLoggerTest, CreateLoggerFromOptions) { + DBOptions options; + NoSleepEnv nse(Env::Default()); + std::shared_ptr<Logger> logger; + + // Normal logger + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + ASSERT_TRUE(dynamic_cast<PosixLogger*>(logger.get())); + + // Only roll by size + InitTestDb(); + options.max_log_file_size = 1024; + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + AutoRollLogger* auto_roll_logger = + dynamic_cast<AutoRollLogger*>(logger.get()); + ASSERT_TRUE(auto_roll_logger); + RollLogFileBySizeTest( + auto_roll_logger, options.max_log_file_size, + kSampleMessage + ":CreateLoggerFromOptions - size"); + + // Only roll by Time + options.env = &nse; + InitTestDb(); + options.max_log_file_size = 0; + options.log_file_time_to_roll = 2; + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + auto_roll_logger = + dynamic_cast<AutoRollLogger*>(logger.get()); + RollLogFileByTimeTest(&nse, auto_roll_logger, options.log_file_time_to_roll, + kSampleMessage + ":CreateLoggerFromOptions - time"); + + // roll by both Time and size + InitTestDb(); + options.max_log_file_size = 1024 * 5; + options.log_file_time_to_roll = 2; + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + auto_roll_logger = + dynamic_cast<AutoRollLogger*>(logger.get()); + RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size, + kSampleMessage + ":CreateLoggerFromOptions - both"); + RollLogFileByTimeTest(&nse, auto_roll_logger, options.log_file_time_to_roll, + kSampleMessage + ":CreateLoggerFromOptions - both"); + + // Set keep_log_file_num + { + const size_t kFileNum = 3; + InitTestDb(); + options.max_log_file_size = 512; + options.log_file_time_to_roll = 2; + options.keep_log_file_num = kFileNum; + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + auto_roll_logger = dynamic_cast<AutoRollLogger*>(logger.get()); + + // Roll the log 4 times, and it will trim to 3 files. + std::string dummy_large_string; + dummy_large_string.assign(options.max_log_file_size, '='); + auto_roll_logger->SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + for (size_t i = 0; i < kFileNum + 1; i++) { + // Log enough bytes to trigger at least one roll. + LogMessage(auto_roll_logger, dummy_large_string.c_str()); + LogMessage(auto_roll_logger, ""); + } + + std::vector<std::string> files = GetLogFiles(); + ASSERT_EQ(kFileNum, files.size()); + + CleanupLogFiles(); + } + + // Set keep_log_file_num and dbname is different from + // db_log_dir. + { + const size_t kFileNum = 3; + InitTestDb(); + options.max_log_file_size = 512; + options.log_file_time_to_roll = 2; + options.keep_log_file_num = kFileNum; + options.db_log_dir = kTestDir; + ASSERT_OK(CreateLoggerFromOptions("/dummy/db/name", options, &logger)); + auto_roll_logger = dynamic_cast<AutoRollLogger*>(logger.get()); + + // Roll the log 4 times, and it will trim to 3 files. + std::string dummy_large_string; + dummy_large_string.assign(options.max_log_file_size, '='); + auto_roll_logger->SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + for (size_t i = 0; i < kFileNum + 1; i++) { + // Log enough bytes to trigger at least one roll. + LogMessage(auto_roll_logger, dummy_large_string.c_str()); + LogMessage(auto_roll_logger, ""); + } + + std::vector<std::string> files = GetLogFiles(); + ASSERT_EQ(kFileNum, files.size()); + for (const auto& f : files) { + ASSERT_TRUE(f.find("dummy") != std::string::npos); + } + + // Cleaning up those files. + CleanupLogFiles(); + } +} + +TEST_F(AutoRollLoggerTest, AutoDeleting) { + for (int attempt = 0; attempt < 2; attempt++) { + // In the first attemp, db_log_dir is not set, while in the + // second it is set. + std::string dbname = (attempt == 0) ? kTestDir : "/test/dummy/dir"; + std::string db_log_dir = (attempt == 0) ? "" : kTestDir; + + InitTestDb(); + const size_t kMaxFileSize = 512; + { + size_t log_num = 8; + AutoRollLogger logger(Env::Default(), dbname, db_log_dir, kMaxFileSize, 0, + log_num); + RollNTimesBySize(&logger, log_num, kMaxFileSize); + + ASSERT_EQ(log_num, GetLogFiles().size()); + } + // Shrink number of files + { + size_t log_num = 5; + AutoRollLogger logger(Env::Default(), dbname, db_log_dir, kMaxFileSize, 0, + log_num); + ASSERT_EQ(log_num, GetLogFiles().size()); + + RollNTimesBySize(&logger, 3, kMaxFileSize); + ASSERT_EQ(log_num, GetLogFiles().size()); + } + + // Increase number of files again. + { + size_t log_num = 7; + AutoRollLogger logger(Env::Default(), dbname, db_log_dir, kMaxFileSize, 0, + log_num); + ASSERT_EQ(6, GetLogFiles().size()); + + RollNTimesBySize(&logger, 3, kMaxFileSize); + ASSERT_EQ(log_num, GetLogFiles().size()); + } + + CleanupLogFiles(); + } +} + +TEST_F(AutoRollLoggerTest, LogFlushWhileRolling) { + DBOptions options; + std::shared_ptr<Logger> logger; + + InitTestDb(); + options.max_log_file_size = 1024 * 5; + ASSERT_OK(CreateLoggerFromOptions(kTestDir, options, &logger)); + AutoRollLogger* auto_roll_logger = + dynamic_cast<AutoRollLogger*>(logger.get()); + ASSERT_TRUE(auto_roll_logger); + ROCKSDB_NAMESPACE::port::Thread flush_thread; + + // Notes: + // (1) Need to pin the old logger before beginning the roll, as rolling grabs + // the mutex, which would prevent us from accessing the old logger. This + // also marks flush_thread with AutoRollLogger::Flush:PinnedLogger. + // (2) Need to reset logger during PosixLogger::Flush() to exercise a race + // condition case, which is executing the flush with the pinned (old) + // logger after auto-roll logger has cut over to a new logger. + // (3) PosixLogger::Flush() happens in both threads but its SyncPoints only + // are enabled in flush_thread (the one pinning the old logger). + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependencyAndMarkers( + {{"AutoRollLogger::Flush:PinnedLogger", + "AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"}, + {"PosixLogger::Flush:Begin1", + "AutoRollLogger::ResetLogger:BeforeNewLogger"}, + {"AutoRollLogger::ResetLogger:AfterNewLogger", + "PosixLogger::Flush:Begin2"}}, + {{"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin1"}, + {"AutoRollLogger::Flush:PinnedLogger", "PosixLogger::Flush:Begin2"}}); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + flush_thread = port::Thread([&]() { auto_roll_logger->Flush(); }); + TEST_SYNC_POINT( + "AutoRollLoggerTest::LogFlushWhileRolling:PreRollAndPostThreadInit"); + RollLogFileBySizeTest(auto_roll_logger, options.max_log_file_size, + kSampleMessage + ":LogFlushWhileRolling"); + flush_thread.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +#endif // OS_WIN + +TEST_F(AutoRollLoggerTest, InfoLogLevel) { + InitTestDb(); + + size_t log_size = 8192; + size_t log_lines = 0; + // an extra-scope to force the AutoRollLogger to flush the log file when it + // becomes out of scope. + { + AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0, 10); + for (int log_level = InfoLogLevel::HEADER_LEVEL; + log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + for (int log_type = InfoLogLevel::DEBUG_LEVEL; + log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) { + // log messages with log level smaller than log_level will not be + // logged. + LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); + } + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; + } + for (int log_level = InfoLogLevel::HEADER_LEVEL; + log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + + // again, messages with level smaller than log_level will not be logged. + ROCKS_LOG_HEADER(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_DEBUG(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_INFO(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_WARN(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_ERROR(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_FATAL(&logger, "%s", kSampleMessage.c_str()); + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; + } + } + std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); + size_t lines = std::count(std::istreambuf_iterator<char>(inFile), + std::istreambuf_iterator<char>(), '\n'); + ASSERT_EQ(log_lines, lines); + inFile.close(); +} + +TEST_F(AutoRollLoggerTest, Close) { + InitTestDb(); + + size_t log_size = 8192; + size_t log_lines = 0; + AutoRollLogger logger(Env::Default(), kTestDir, "", log_size, 0, 10); + for (int log_level = InfoLogLevel::HEADER_LEVEL; + log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + for (int log_type = InfoLogLevel::DEBUG_LEVEL; + log_type <= InfoLogLevel::HEADER_LEVEL; log_type++) { + // log messages with log level smaller than log_level will not be + // logged. + LogMessage((InfoLogLevel)log_type, &logger, kSampleMessage.c_str()); + } + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; + } + for (int log_level = InfoLogLevel::HEADER_LEVEL; + log_level >= InfoLogLevel::DEBUG_LEVEL; log_level--) { + logger.SetInfoLogLevel((InfoLogLevel)log_level); + + // again, messages with level smaller than log_level will not be logged. + ROCKS_LOG_HEADER(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_DEBUG(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_INFO(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_WARN(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_ERROR(&logger, "%s", kSampleMessage.c_str()); + ROCKS_LOG_FATAL(&logger, "%s", kSampleMessage.c_str()); + log_lines += InfoLogLevel::HEADER_LEVEL - log_level + 1; + } + ASSERT_EQ(logger.Close(), Status::OK()); + + std::ifstream inFile(AutoRollLoggerTest::kLogFile.c_str()); + size_t lines = std::count(std::istreambuf_iterator<char>(inFile), + std::istreambuf_iterator<char>(), '\n'); + ASSERT_EQ(log_lines, lines); + inFile.close(); +} + +// Test the logger Header function for roll over logs +// We expect the new logs creates as roll over to carry the headers specified +static std::vector<std::string> GetOldFileNames(const std::string& path) { + std::vector<std::string> ret; + + const std::string dirname = path.substr(/*start=*/0, path.find_last_of("/")); + const std::string fname = path.substr(path.find_last_of("/") + 1); + + std::vector<std::string> children; + Env::Default()->GetChildren(dirname, &children); + + // We know that the old log files are named [path]<something> + // Return all entities that match the pattern + for (auto& child : children) { + if (fname != child && child.find(fname) == 0) { + ret.push_back(dirname + "/" + child); + } + } + + return ret; +} + +TEST_F(AutoRollLoggerTest, LogHeaderTest) { + static const size_t MAX_HEADERS = 10; + static const size_t LOG_MAX_SIZE = 1024 * 5; + static const std::string HEADER_STR = "Log header line"; + + // test_num == 0 -> standard call to Header() + // test_num == 1 -> call to Log() with InfoLogLevel::HEADER_LEVEL + for (int test_num = 0; test_num < 2; test_num++) { + + InitTestDb(); + + AutoRollLogger logger(Env::Default(), kTestDir, /*db_log_dir=*/"", + LOG_MAX_SIZE, /*log_file_time_to_roll=*/0, + /*keep_log_file_num=*/10); + + if (test_num == 0) { + // Log some headers explicitly using Header() + for (size_t i = 0; i < MAX_HEADERS; i++) { + Header(&logger, "%s %" ROCKSDB_PRIszt, HEADER_STR.c_str(), i); + } + } else if (test_num == 1) { + // HEADER_LEVEL should make this behave like calling Header() + for (size_t i = 0; i < MAX_HEADERS; i++) { + ROCKS_LOG_HEADER(&logger, "%s %" ROCKSDB_PRIszt, HEADER_STR.c_str(), i); + } + } + + const std::string newfname = logger.TEST_log_fname(); + + // Log enough data to cause a roll over + int i = 0; + for (size_t iter = 0; iter < 2; iter++) { + while (logger.GetLogFileSize() < LOG_MAX_SIZE) { + Info(&logger, (kSampleMessage + ":LogHeaderTest line %d").c_str(), i); + ++i; + } + + Info(&logger, "Rollover"); + } + + // Flush the log for the latest file + LogFlush(&logger); + + const auto oldfiles = GetOldFileNames(newfname); + + ASSERT_EQ(oldfiles.size(), (size_t) 2); + + for (auto& oldfname : oldfiles) { + // verify that the files rolled over + ASSERT_NE(oldfname, newfname); + // verify that the old log contains all the header logs + ASSERT_EQ(test::GetLinesCount(oldfname, HEADER_STR), MAX_HEADERS); + } + } +} + +TEST_F(AutoRollLoggerTest, LogFileExistence) { + ROCKSDB_NAMESPACE::DB* db; + ROCKSDB_NAMESPACE::Options options; +#ifdef OS_WIN + // Replace all slashes in the path so windows CompSpec does not + // become confused + std::string testDir(kTestDir); + std::replace_if(testDir.begin(), testDir.end(), + [](char ch) { return ch == '/'; }, '\\'); + std::string deleteCmd = "if exist " + testDir + " rd /s /q " + testDir; +#else + std::string deleteCmd = "rm -rf " + kTestDir; +#endif + ASSERT_EQ(system(deleteCmd.c_str()), 0); + options.max_log_file_size = 100 * 1024 * 1024; + options.create_if_missing = true; + ASSERT_OK(ROCKSDB_NAMESPACE::DB::Open(options, kTestDir, &db)); + ASSERT_OK(default_env->FileExists(kLogFile)); + delete db; +} + +TEST_F(AutoRollLoggerTest, FileCreateFailure) { + Options options; + options.max_log_file_size = 100 * 1024 * 1024; + options.db_log_dir = "/a/dir/does/not/exist/at/all"; + + std::shared_ptr<Logger> logger; + ASSERT_NOK(CreateLoggerFromOptions("", options, &logger)); + ASSERT_TRUE(!logger); +} +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include <stdio.h> + +int main(int /*argc*/, char** /*argv*/) { + fprintf(stderr, + "SKIPPED as AutoRollLogger is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE diff --git a/src/rocksdb/logging/env_logger.h b/src/rocksdb/logging/env_logger.h new file mode 100644 index 000000000..9fecb50cf --- /dev/null +++ b/src/rocksdb/logging/env_logger.h @@ -0,0 +1,165 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Logger implementation that uses custom Env object for logging. + +#pragma once + +#include <time.h> +#include <atomic> +#include <memory> +#include "port/sys_time.h" + +#include "file/writable_file_writer.h" +#include "monitoring/iostats_context_imp.h" +#include "rocksdb/env.h" +#include "rocksdb/slice.h" +#include "test_util/sync_point.h" +#include "util/mutexlock.h" + +namespace ROCKSDB_NAMESPACE { + +class EnvLogger : public Logger { + public: + EnvLogger(std::unique_ptr<FSWritableFile>&& writable_file, + const std::string& fname, const EnvOptions& options, Env* env, + InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) + : Logger(log_level), + file_(std::move(writable_file), fname, options, env), + last_flush_micros_(0), + env_(env), + flush_pending_(false) {} + + ~EnvLogger() { + if (!closed_) { + closed_ = true; + CloseHelper(); + } + } + + private: + void FlushLocked() { + mutex_.AssertHeld(); + if (flush_pending_) { + flush_pending_ = false; + file_.Flush(); + } + last_flush_micros_ = env_->NowMicros(); + } + + void Flush() override { + TEST_SYNC_POINT("EnvLogger::Flush:Begin1"); + TEST_SYNC_POINT("EnvLogger::Flush:Begin2"); + + MutexLock l(&mutex_); + FlushLocked(); + } + + Status CloseImpl() override { return CloseHelper(); } + + Status CloseHelper() { + mutex_.Lock(); + const auto close_status = file_.Close(); + mutex_.Unlock(); + + if (close_status.ok()) { + return close_status; + } + return Status::IOError("Close of log file failed with error:" + + (close_status.getState() + ? std::string(close_status.getState()) + : std::string())); + } + + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + IOSTATS_TIMER_GUARD(logger_nanos); + + const uint64_t thread_id = env_->GetThreadID(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 65536; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, + t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec), + static_cast<long long unsigned int>(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + mutex_.Lock(); + // We will ignore any error returned by Append(). + file_.Append(Slice(base, p - base)); + flush_pending_ = true; + const uint64_t now_micros = env_->NowMicros(); + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + FlushLocked(); + } + mutex_.Unlock(); + if (base != buffer) { + delete[] base; + } + break; + } + } + + size_t GetLogFileSize() const override { + MutexLock l(&mutex_); + return file_.GetFileSize(); + } + + private: + WritableFileWriter file_; + mutable port::Mutex mutex_; // Mutex to protect the shared variables below. + const static uint64_t flush_every_seconds_ = 5; + std::atomic_uint_fast64_t last_flush_micros_; + Env* env_; + std::atomic<bool> flush_pending_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/env_logger_test.cc b/src/rocksdb/logging/env_logger_test.cc new file mode 100644 index 000000000..375e2cf5b --- /dev/null +++ b/src/rocksdb/logging/env_logger_test.cc @@ -0,0 +1,162 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// + +#include "logging/env_logger.h" +#include "env/mock_env.h" +#include "test_util/testharness.h" +#include "test_util/testutil.h" + +namespace ROCKSDB_NAMESPACE { + +namespace { +// In this test we only want to Log some simple log message with +// no format. +void LogMessage(std::shared_ptr<Logger> logger, const std::string& message) { + Log(logger, "%s", message.c_str()); +} + +// Helper method to write the message num_times in the given logger. +void WriteLogs(std::shared_ptr<Logger> logger, const std::string& message, + int num_times) { + for (int ii = 0; ii < num_times; ++ii) { + LogMessage(logger, message); + } +} + +} // namespace + +class EnvLoggerTest : public testing::Test { + public: + Env* env_; + + EnvLoggerTest() : env_(Env::Default()) {} + + ~EnvLoggerTest() = default; + + std::shared_ptr<Logger> CreateLogger() { + std::shared_ptr<Logger> result; + assert(NewEnvLogger(kLogFile, env_, &result).ok()); + assert(result); + result->SetInfoLogLevel(InfoLogLevel::INFO_LEVEL); + return result; + } + + void DeleteLogFile() { ASSERT_OK(env_->DeleteFile(kLogFile)); } + + static const std::string kSampleMessage; + static const std::string kTestDir; + static const std::string kLogFile; +}; + +const std::string EnvLoggerTest::kSampleMessage = + "this is the message to be written to the log file!!"; +const std::string EnvLoggerTest::kLogFile = test::PerThreadDBPath("log_file"); + +TEST_F(EnvLoggerTest, EmptyLogFile) { + auto logger = CreateLogger(); + ASSERT_EQ(logger->Close(), Status::OK()); + + // Check the size of the log file. + uint64_t file_size; + ASSERT_EQ(env_->GetFileSize(kLogFile, &file_size), Status::OK()); + ASSERT_EQ(file_size, 0); + DeleteLogFile(); +} + +TEST_F(EnvLoggerTest, LogMultipleLines) { + auto logger = CreateLogger(); + + // Write multiple lines. + const int kNumIter = 10; + WriteLogs(logger, kSampleMessage, kNumIter); + + // Flush the logs. + logger->Flush(); + ASSERT_EQ(logger->Close(), Status::OK()); + + // Validate whether the log file has 'kNumIter' number of lines. + ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), kNumIter); + DeleteLogFile(); +} + +TEST_F(EnvLoggerTest, Overwrite) { + { + auto logger = CreateLogger(); + + // Write multiple lines. + const int kNumIter = 10; + WriteLogs(logger, kSampleMessage, kNumIter); + + ASSERT_EQ(logger->Close(), Status::OK()); + + // Validate whether the log file has 'kNumIter' number of lines. + ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), kNumIter); + } + + // Now reopen the file again. + { + auto logger = CreateLogger(); + + // File should be empty. + uint64_t file_size; + ASSERT_EQ(env_->GetFileSize(kLogFile, &file_size), Status::OK()); + ASSERT_EQ(file_size, 0); + ASSERT_EQ(logger->GetLogFileSize(), 0); + ASSERT_EQ(logger->Close(), Status::OK()); + } + DeleteLogFile(); +} + +TEST_F(EnvLoggerTest, Close) { + auto logger = CreateLogger(); + + // Write multiple lines. + const int kNumIter = 10; + WriteLogs(logger, kSampleMessage, kNumIter); + + ASSERT_EQ(logger->Close(), Status::OK()); + + // Validate whether the log file has 'kNumIter' number of lines. + ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), kNumIter); + DeleteLogFile(); +} + +TEST_F(EnvLoggerTest, ConcurrentLogging) { + auto logger = CreateLogger(); + + const int kNumIter = 20; + std::function<void()> cb = [&]() { + WriteLogs(logger, kSampleMessage, kNumIter); + logger->Flush(); + }; + + // Write to the logs from multiple threads. + std::vector<port::Thread> threads; + const int kNumThreads = 5; + // Create threads. + for (int ii = 0; ii < kNumThreads; ++ii) { + threads.push_back(port::Thread(cb)); + } + + // Wait for them to complete. + for (auto& th : threads) { + th.join(); + } + + ASSERT_EQ(logger->Close(), Status::OK()); + + // Verfiy the log file. + ASSERT_EQ(test::GetLinesCount(kLogFile, kSampleMessage), + kNumIter * kNumThreads); + DeleteLogFile(); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/logging/event_logger.cc b/src/rocksdb/logging/event_logger.cc new file mode 100644 index 000000000..f1747ad25 --- /dev/null +++ b/src/rocksdb/logging/event_logger.cc @@ -0,0 +1,71 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "logging/event_logger.h" + +#include <cassert> +#include <cinttypes> +#include <sstream> +#include <string> + +#include "logging/logging.h" +#include "util/string_util.h" + +namespace ROCKSDB_NAMESPACE { + +EventLoggerStream::EventLoggerStream(Logger* logger) + : logger_(logger), + log_buffer_(nullptr), + max_log_size_(0), + json_writer_(nullptr) {} + +EventLoggerStream::EventLoggerStream(LogBuffer* log_buffer, + const size_t max_log_size) + : logger_(nullptr), + log_buffer_(log_buffer), + max_log_size_(max_log_size), + json_writer_(nullptr) {} + +EventLoggerStream::~EventLoggerStream() { + if (json_writer_) { + json_writer_->EndObject(); +#ifdef ROCKSDB_PRINT_EVENTS_TO_STDOUT + printf("%s\n", json_writer_->Get().c_str()); +#else + if (logger_) { + EventLogger::Log(logger_, *json_writer_); + } else if (log_buffer_) { + assert(max_log_size_); + EventLogger::LogToBuffer(log_buffer_, *json_writer_, max_log_size_); + } +#endif + delete json_writer_; + } +} + +void EventLogger::Log(const JSONWriter& jwriter) { + Log(logger_, jwriter); +} + +void EventLogger::Log(Logger* logger, const JSONWriter& jwriter) { +#ifdef ROCKSDB_PRINT_EVENTS_TO_STDOUT + printf("%s\n", jwriter.Get().c_str()); +#else + ROCKSDB_NAMESPACE::Log(logger, "%s %s", Prefix(), jwriter.Get().c_str()); +#endif +} + +void EventLogger::LogToBuffer(LogBuffer* log_buffer, const JSONWriter& jwriter, + const size_t max_log_size) { +#ifdef ROCKSDB_PRINT_EVENTS_TO_STDOUT + printf("%s\n", jwriter.Get().c_str()); +#else + assert(log_buffer); + ROCKSDB_NAMESPACE::LogToBuffer(log_buffer, max_log_size, "%s %s", Prefix(), + jwriter.Get().c_str()); +#endif +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/event_logger.h b/src/rocksdb/logging/event_logger.h new file mode 100644 index 000000000..596eb0f51 --- /dev/null +++ b/src/rocksdb/logging/event_logger.h @@ -0,0 +1,203 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <memory> +#include <sstream> +#include <string> +#include <chrono> + +#include "logging/log_buffer.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { + +class JSONWriter { + public: + JSONWriter() : state_(kExpectKey), first_element_(true), in_array_(false) { + stream_ << "{"; + } + + void AddKey(const std::string& key) { + assert(state_ == kExpectKey); + if (!first_element_) { + stream_ << ", "; + } + stream_ << "\"" << key << "\": "; + state_ = kExpectValue; + first_element_ = false; + } + + void AddValue(const char* value) { + assert(state_ == kExpectValue || state_ == kInArray); + if (state_ == kInArray && !first_element_) { + stream_ << ", "; + } + stream_ << "\"" << value << "\""; + if (state_ != kInArray) { + state_ = kExpectKey; + } + first_element_ = false; + } + + template <typename T> + void AddValue(const T& value) { + assert(state_ == kExpectValue || state_ == kInArray); + if (state_ == kInArray && !first_element_) { + stream_ << ", "; + } + stream_ << value; + if (state_ != kInArray) { + state_ = kExpectKey; + } + first_element_ = false; + } + + void StartArray() { + assert(state_ == kExpectValue); + state_ = kInArray; + in_array_ = true; + stream_ << "["; + first_element_ = true; + } + + void EndArray() { + assert(state_ == kInArray); + state_ = kExpectKey; + in_array_ = false; + stream_ << "]"; + first_element_ = false; + } + + void StartObject() { + assert(state_ == kExpectValue); + state_ = kExpectKey; + stream_ << "{"; + first_element_ = true; + } + + void EndObject() { + assert(state_ == kExpectKey); + stream_ << "}"; + first_element_ = false; + } + + void StartArrayedObject() { + assert(state_ == kInArray && in_array_); + state_ = kExpectValue; + if (!first_element_) { + stream_ << ", "; + } + StartObject(); + } + + void EndArrayedObject() { + assert(in_array_); + EndObject(); + state_ = kInArray; + } + + std::string Get() const { return stream_.str(); } + + JSONWriter& operator<<(const char* val) { + if (state_ == kExpectKey) { + AddKey(val); + } else { + AddValue(val); + } + return *this; + } + + JSONWriter& operator<<(const std::string& val) { + return *this << val.c_str(); + } + + template <typename T> + JSONWriter& operator<<(const T& val) { + assert(state_ != kExpectKey); + AddValue(val); + return *this; + } + + private: + enum JSONWriterState { + kExpectKey, + kExpectValue, + kInArray, + kInArrayedObject, + }; + JSONWriterState state_; + bool first_element_; + bool in_array_; + std::ostringstream stream_; +}; + +class EventLoggerStream { + public: + template <typename T> + EventLoggerStream& operator<<(const T& val) { + MakeStream(); + *json_writer_ << val; + return *this; + } + + void StartArray() { json_writer_->StartArray(); } + void EndArray() { json_writer_->EndArray(); } + void StartObject() { json_writer_->StartObject(); } + void EndObject() { json_writer_->EndObject(); } + + ~EventLoggerStream(); + + private: + void MakeStream() { + if (!json_writer_) { + json_writer_ = new JSONWriter(); + *this << "time_micros" + << std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::system_clock::now().time_since_epoch()).count(); + } + } + friend class EventLogger; + explicit EventLoggerStream(Logger* logger); + explicit EventLoggerStream(LogBuffer* log_buffer, const size_t max_log_size); + // exactly one is non-nullptr + Logger* const logger_; + LogBuffer* const log_buffer_; + const size_t max_log_size_; // used only for log_buffer_ + // ownership + JSONWriter* json_writer_; +}; + +// here is an example of the output that will show up in the LOG: +// 2015/01/15-14:13:25.788019 1105ef000 EVENT_LOG_v1 {"time_micros": +// 1421360005788015, "event": "table_file_creation", "file_number": 12, +// "file_size": 1909699} +class EventLogger { + public: + static const char* Prefix() { + return "EVENT_LOG_v1"; + } + + explicit EventLogger(Logger* logger) : logger_(logger) {} + EventLoggerStream Log() { return EventLoggerStream(logger_); } + EventLoggerStream LogToBuffer(LogBuffer* log_buffer) { + return EventLoggerStream(log_buffer, LogBuffer::kDefaultMaxLogSize); + } + EventLoggerStream LogToBuffer(LogBuffer* log_buffer, + const size_t max_log_size) { + return EventLoggerStream(log_buffer, max_log_size); + } + void Log(const JSONWriter& jwriter); + static void Log(Logger* logger, const JSONWriter& jwriter); + static void LogToBuffer( + LogBuffer* log_buffer, const JSONWriter& jwriter, + const size_t max_log_size = LogBuffer::kDefaultMaxLogSize); + + private: + Logger* logger_; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/event_logger_test.cc b/src/rocksdb/logging/event_logger_test.cc new file mode 100644 index 000000000..656d73c87 --- /dev/null +++ b/src/rocksdb/logging/event_logger_test.cc @@ -0,0 +1,43 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include <string> + +#include "logging/event_logger.h" +#include "test_util/testharness.h" + +namespace ROCKSDB_NAMESPACE { + +class EventLoggerTest : public testing::Test {}; + +class StringLogger : public Logger { + public: + using Logger::Logv; + void Logv(const char* format, va_list ap) override { + vsnprintf(buffer_, sizeof(buffer_), format, ap); + } + char* buffer() { return buffer_; } + + private: + char buffer_[1000]; +}; + +TEST_F(EventLoggerTest, SimpleTest) { + StringLogger logger; + EventLogger event_logger(&logger); + event_logger.Log() << "id" << 5 << "event" + << "just_testing"; + std::string output(logger.buffer()); + ASSERT_TRUE(output.find("\"event\": \"just_testing\"") != std::string::npos); + ASSERT_TRUE(output.find("\"id\": 5") != std::string::npos); + ASSERT_TRUE(output.find("\"time_micros\"") != std::string::npos); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/src/rocksdb/logging/log_buffer.cc b/src/rocksdb/logging/log_buffer.cc new file mode 100644 index 000000000..242d280b3 --- /dev/null +++ b/src/rocksdb/logging/log_buffer.cc @@ -0,0 +1,92 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "logging/log_buffer.h" + +#include "port/sys_time.h" +#include "port/port.h" + +namespace ROCKSDB_NAMESPACE { + +LogBuffer::LogBuffer(const InfoLogLevel log_level, + Logger*info_log) + : log_level_(log_level), info_log_(info_log) {} + +void LogBuffer::AddLogToBuffer(size_t max_log_size, const char* format, + va_list ap) { + if (!info_log_ || log_level_ < info_log_->GetInfoLogLevel()) { + // Skip the level because of its level. + return; + } + + char* alloc_mem = arena_.AllocateAligned(max_log_size); + BufferedLog* buffered_log = new (alloc_mem) BufferedLog(); + char* p = buffered_log->message; + char* limit = alloc_mem + max_log_size - 1; + + // store the time + gettimeofday(&(buffered_log->now_tv), nullptr); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + auto n = vsnprintf(p, limit - p, format, backup_ap); +#ifndef OS_WIN + // MS reports -1 when the buffer is too short + assert(n >= 0); +#endif + if (n > 0) { + p += n; + } else { + p = limit; + } + va_end(backup_ap); + } + + if (p > limit) { + p = limit; + } + + // Add '\0' to the end + *p = '\0'; + + logs_.push_back(buffered_log); +} + +void LogBuffer::FlushBufferToLog() { + for (BufferedLog* log : logs_) { + const time_t seconds = log->now_tv.tv_sec; + struct tm t; + if (localtime_r(&seconds, &t) != nullptr) { + Log(log_level_, info_log_, + "(Original Log Time %04d/%02d/%02d-%02d:%02d:%02d.%06d) %s", + t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour, t.tm_min, + t.tm_sec, static_cast<int>(log->now_tv.tv_usec), log->message); + } + } + logs_.clear(); +} + +void LogToBuffer(LogBuffer* log_buffer, size_t max_log_size, const char* format, + ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(max_log_size, format, ap); + va_end(ap); + } +} + +void LogToBuffer(LogBuffer* log_buffer, const char* format, ...) { + if (log_buffer != nullptr) { + va_list ap; + va_start(ap, format); + log_buffer->AddLogToBuffer(LogBuffer::kDefaultMaxLogSize, format, ap); + va_end(ap); + } +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/log_buffer.h b/src/rocksdb/logging/log_buffer.h new file mode 100644 index 000000000..285256e20 --- /dev/null +++ b/src/rocksdb/logging/log_buffer.h @@ -0,0 +1,56 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include <ctime> +#include "memory/arena.h" +#include "port/sys_time.h" +#include "rocksdb/env.h" +#include "util/autovector.h" + +namespace ROCKSDB_NAMESPACE { + +class Logger; + +// A class to buffer info log entries and flush them in the end. +class LogBuffer { + public: + // log_level: the log level for all the logs + // info_log: logger to write the logs to + LogBuffer(const InfoLogLevel log_level, Logger* info_log); + + // Add a log entry to the buffer. Use default max_log_size. + // max_log_size indicates maximize log size, including some metadata. + void AddLogToBuffer(size_t max_log_size, const char* format, va_list ap); + + size_t IsEmpty() const { return logs_.empty(); } + + // Flush all buffered log to the info log. + void FlushBufferToLog(); + static const size_t kDefaultMaxLogSize = 512; + + private: + // One log entry with its timestamp + struct BufferedLog { + struct timeval now_tv; // Timestamp of the log + char message[1]; // Beginning of log message + }; + + const InfoLogLevel log_level_; + Logger* info_log_; + Arena arena_; + autovector<BufferedLog*> logs_; +}; + +// Add log to the LogBuffer for a delayed info logging. It can be used when +// we want to add some logs inside a mutex. +// max_log_size indicates maximize log size, including some metadata. +extern void LogToBuffer(LogBuffer* log_buffer, size_t max_log_size, + const char* format, ...); +// Same as previous function, but with default max log size. +extern void LogToBuffer(LogBuffer* log_buffer, const char* format, ...); + +} // namespace ROCKSDB_NAMESPACE diff --git a/src/rocksdb/logging/logging.h b/src/rocksdb/logging/logging.h new file mode 100644 index 000000000..178addcc3 --- /dev/null +++ b/src/rocksdb/logging/logging.h @@ -0,0 +1,68 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Must not be included from any .h files to avoid polluting the namespace +// with macros. + +#pragma once + +// Helper macros that include information about file name and line number +#define ROCKS_LOG_STRINGIFY(x) #x +#define ROCKS_LOG_TOSTRING(x) ROCKS_LOG_STRINGIFY(x) +#define ROCKS_LOG_PREPEND_FILE_LINE(FMT) ("[%s:" ROCKS_LOG_TOSTRING(__LINE__) "] " FMT) + +inline const char* RocksLogShorterFileName(const char* file) +{ + // 15 is the length of "logging/logging.h". + // If the name of this file changed, please change this number, too. + return file + (sizeof(__FILE__) > 15 ? sizeof(__FILE__) - 15 : 0); +} + +// Don't inclide file/line info in HEADER level +#define ROCKS_LOG_HEADER(LGR, FMT, ...) \ + ROCKSDB_NAMESPACE::Log(InfoLogLevel::HEADER_LEVEL, LGR, FMT, ##__VA_ARGS__) + +#define ROCKS_LOG_DEBUG(LGR, FMT, ...) \ + ROCKSDB_NAMESPACE::Log(InfoLogLevel::DEBUG_LEVEL, LGR, \ + ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_INFO(LGR, FMT, ...) \ + ROCKSDB_NAMESPACE::Log(InfoLogLevel::INFO_LEVEL, LGR, \ + ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_WARN(LGR, FMT, ...) \ + ROCKSDB_NAMESPACE::Log(InfoLogLevel::WARN_LEVEL, LGR, \ + ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_ERROR(LGR, FMT, ...) \ + ROCKSDB_NAMESPACE::Log(InfoLogLevel::ERROR_LEVEL, LGR, \ + ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_FATAL(LGR, FMT, ...) \ + ROCKSDB_NAMESPACE::Log(InfoLogLevel::FATAL_LEVEL, LGR, \ + ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_BUFFER(LOG_BUF, FMT, ...) \ + ROCKSDB_NAMESPACE::LogToBuffer(LOG_BUF, ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), \ + ##__VA_ARGS__) + +#define ROCKS_LOG_BUFFER_MAX_SZ(LOG_BUF, MAX_LOG_SIZE, FMT, ...) \ + ROCKSDB_NAMESPACE::LogToBuffer( \ + LOG_BUF, MAX_LOG_SIZE, ROCKS_LOG_PREPEND_FILE_LINE(FMT), \ + RocksLogShorterFileName(__FILE__), ##__VA_ARGS__) + +#define ROCKS_LOG_DETAILS(LGR, FMT, ...) \ + ; // due to overhead by default skip such lines +// ROCKS_LOG_DEBUG(LGR, FMT, ##__VA_ARGS__) diff --git a/src/rocksdb/logging/posix_logger.h b/src/rocksdb/logging/posix_logger.h new file mode 100644 index 000000000..fbd297c68 --- /dev/null +++ b/src/rocksdb/logging/posix_logger.h @@ -0,0 +1,185 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Logger implementation that can be shared by all environments +// where enough posix functionality is available. + +#pragma once +#include <algorithm> +#include <stdio.h> +#include "port/sys_time.h" +#include <time.h> +#include <fcntl.h> + +#ifdef OS_LINUX +#ifndef FALLOC_FL_KEEP_SIZE +#include <linux/falloc.h> +#endif +#endif + +#include <atomic> +#include "env/io_posix.h" +#include "monitoring/iostats_context_imp.h" +#include "rocksdb/env.h" +#include "test_util/sync_point.h" + +namespace ROCKSDB_NAMESPACE { + +class PosixLogger : public Logger { + private: + Status PosixCloseHelper() { + int ret; + + ret = fclose(file_); + if (ret) { + return IOError("Unable to close log file", "", ret); + } + return Status::OK(); + } + FILE* file_; + uint64_t (*gettid_)(); // Return the thread id for the current thread + std::atomic_size_t log_size_; + int fd_; + const static uint64_t flush_every_seconds_ = 5; + std::atomic_uint_fast64_t last_flush_micros_; + Env* env_; + std::atomic<bool> flush_pending_; + + protected: + virtual Status CloseImpl() override { return PosixCloseHelper(); } + + public: + PosixLogger(FILE* f, uint64_t (*gettid)(), Env* env, + const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL) + : Logger(log_level), + file_(f), + gettid_(gettid), + log_size_(0), + fd_(fileno(f)), + last_flush_micros_(0), + env_(env), + flush_pending_(false) {} + virtual ~PosixLogger() { + if (!closed_) { + closed_ = true; + PosixCloseHelper(); + } + } + virtual void Flush() override { + TEST_SYNC_POINT("PosixLogger::Flush:Begin1"); + TEST_SYNC_POINT("PosixLogger::Flush:Begin2"); + if (flush_pending_) { + flush_pending_ = false; + fflush(file_); + } + last_flush_micros_ = env_->NowMicros(); + } + + using Logger::Logv; + virtual void Logv(const char* format, va_list ap) override { + IOSTATS_TIMER_GUARD(logger_nanos); + + const uint64_t thread_id = (*gettid_)(); + + // We try twice: the first time with a fixed-size stack allocated buffer, + // and the second time with a much larger dynamically allocated buffer. + char buffer[500]; + for (int iter = 0; iter < 2; iter++) { + char* base; + int bufsize; + if (iter == 0) { + bufsize = sizeof(buffer); + base = buffer; + } else { + bufsize = 65536; + base = new char[bufsize]; + } + char* p = base; + char* limit = base + bufsize; + + struct timeval now_tv; + gettimeofday(&now_tv, nullptr); + const time_t seconds = now_tv.tv_sec; + struct tm t; + localtime_r(&seconds, &t); + p += snprintf(p, limit - p, + "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", + t.tm_year + 1900, + t.tm_mon + 1, + t.tm_mday, + t.tm_hour, + t.tm_min, + t.tm_sec, + static_cast<int>(now_tv.tv_usec), + static_cast<long long unsigned int>(thread_id)); + + // Print the message + if (p < limit) { + va_list backup_ap; + va_copy(backup_ap, ap); + p += vsnprintf(p, limit - p, format, backup_ap); + va_end(backup_ap); + } + + // Truncate to available space if necessary + if (p >= limit) { + if (iter == 0) { + continue; // Try again with larger buffer + } else { + p = limit - 1; + } + } + + // Add newline if necessary + if (p == base || p[-1] != '\n') { + *p++ = '\n'; + } + + assert(p <= limit); + const size_t write_size = p - base; + +#ifdef ROCKSDB_FALLOCATE_PRESENT + const int kDebugLogChunkSize = 128 * 1024; + + // If this write would cross a boundary of kDebugLogChunkSize + // space, pre-allocate more space to avoid overly large + // allocations from filesystem allocsize options. + const size_t log_size = log_size_; + const size_t last_allocation_chunk = + ((kDebugLogChunkSize - 1 + log_size) / kDebugLogChunkSize); + const size_t desired_allocation_chunk = + ((kDebugLogChunkSize - 1 + log_size + write_size) / + kDebugLogChunkSize); + if (last_allocation_chunk != desired_allocation_chunk) { + fallocate( + fd_, FALLOC_FL_KEEP_SIZE, 0, + static_cast<off_t>(desired_allocation_chunk * kDebugLogChunkSize)); + } +#endif + + size_t sz = fwrite(base, 1, write_size, file_); + flush_pending_ = true; + if (sz > 0) { + log_size_ += write_size; + } + uint64_t now_micros = static_cast<uint64_t>(now_tv.tv_sec) * 1000000 + + now_tv.tv_usec; + if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) { + Flush(); + } + if (base != buffer) { + delete[] base; + } + break; + } + } + size_t GetLogFileSize() const override { return log_size_; } +}; + +} // namespace ROCKSDB_NAMESPACE |