diff options
Diffstat (limited to 'src/rocksdb/db/event_helpers.cc')
-rw-r--r-- | src/rocksdb/db/event_helpers.cc | 371 |
1 files changed, 371 insertions, 0 deletions
diff --git a/src/rocksdb/db/event_helpers.cc b/src/rocksdb/db/event_helpers.cc new file mode 100644 index 000000000..7987b8ec6 --- /dev/null +++ b/src/rocksdb/db/event_helpers.cc @@ -0,0 +1,371 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/event_helpers.h" + +#include "rocksdb/convenience.h" +#include "rocksdb/listener.h" +#include "rocksdb/utilities/customizable_util.h" + +namespace ROCKSDB_NAMESPACE { +#ifndef ROCKSDB_LITE +Status EventListener::CreateFromString(const ConfigOptions& config_options, + const std::string& id, + std::shared_ptr<EventListener>* result) { + return LoadSharedObject<EventListener>(config_options, id, nullptr, result); +} +#endif // ROCKSDB_LITE + +namespace { +template <class T> +inline T SafeDivide(T a, T b) { + return b == 0 ? 0 : a / b; +} +} // anonymous namespace + +void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) { + *jwriter << "time_micros" + << std::chrono::duration_cast<std::chrono::microseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); +} + +#ifndef ROCKSDB_LITE +void EventHelpers::NotifyTableFileCreationStarted( + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, TableFileCreationReason reason) { + if (listeners.empty()) { + return; + } + TableFileCreationBriefInfo info; + info.db_name = db_name; + info.cf_name = cf_name; + info.file_path = file_path; + info.job_id = job_id; + info.reason = reason; + for (auto& listener : listeners) { + listener->OnTableFileCreationStarted(info); + } +} +#endif // !ROCKSDB_LITE + +void EventHelpers::NotifyOnBackgroundError( + const std::vector<std::shared_ptr<EventListener>>& listeners, + BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex, + bool* auto_recovery) { +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + db_mutex->AssertHeld(); + // release lock while notifying events + db_mutex->Unlock(); + for (auto& listener : listeners) { + listener->OnBackgroundError(reason, bg_error); + bg_error->PermitUncheckedError(); + if (*auto_recovery) { + listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery); + } + } + db_mutex->Lock(); +#else + (void)listeners; + (void)reason; + (void)bg_error; + (void)db_mutex; + (void)auto_recovery; +#endif // ROCKSDB_LITE +} + +void EventHelpers::LogAndNotifyTableFileCreationFinished( + EventLogger* event_logger, + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, const FileDescriptor& fd, + uint64_t oldest_blob_file_number, const TableProperties& table_properties, + TableFileCreationReason reason, const Status& s, + const std::string& file_checksum, + const std::string& file_checksum_func_name) { + if (s.ok() && event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + jwriter << "cf_name" << cf_name << "job" << job_id << "event" + << "table_file_creation" + << "file_number" << fd.GetNumber() << "file_size" + << fd.GetFileSize() << "file_checksum" + << Slice(file_checksum).ToString(true) << "file_checksum_func_name" + << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno + << "largest_seqno" << fd.largest_seqno; + + // table_properties + { + jwriter << "table_properties"; + jwriter.StartObject(); + + // basic properties: + jwriter << "data_size" << table_properties.data_size << "index_size" + << table_properties.index_size << "index_partitions" + << table_properties.index_partitions << "top_level_index_size" + << table_properties.top_level_index_size + << "index_key_is_user_key" + << table_properties.index_key_is_user_key + << "index_value_is_delta_encoded" + << table_properties.index_value_is_delta_encoded << "filter_size" + << table_properties.filter_size << "raw_key_size" + << table_properties.raw_key_size << "raw_average_key_size" + << SafeDivide(table_properties.raw_key_size, + table_properties.num_entries) + << "raw_value_size" << table_properties.raw_value_size + << "raw_average_value_size" + << SafeDivide(table_properties.raw_value_size, + table_properties.num_entries) + << "num_data_blocks" << table_properties.num_data_blocks + << "num_entries" << table_properties.num_entries + << "num_filter_entries" << table_properties.num_filter_entries + << "num_deletions" << table_properties.num_deletions + << "num_merge_operands" << table_properties.num_merge_operands + << "num_range_deletions" << table_properties.num_range_deletions + << "format_version" << table_properties.format_version + << "fixed_key_len" << table_properties.fixed_key_len + << "filter_policy" << table_properties.filter_policy_name + << "column_family_name" << table_properties.column_family_name + << "column_family_id" << table_properties.column_family_id + << "comparator" << table_properties.comparator_name + << "merge_operator" << table_properties.merge_operator_name + << "prefix_extractor_name" + << table_properties.prefix_extractor_name << "property_collectors" + << table_properties.property_collectors_names << "compression" + << table_properties.compression_name << "compression_options" + << table_properties.compression_options << "creation_time" + << table_properties.creation_time << "oldest_key_time" + << table_properties.oldest_key_time << "file_creation_time" + << table_properties.file_creation_time + << "slow_compression_estimated_data_size" + << table_properties.slow_compression_estimated_data_size + << "fast_compression_estimated_data_size" + << table_properties.fast_compression_estimated_data_size + << "db_id" << table_properties.db_id << "db_session_id" + << table_properties.db_session_id << "orig_file_number" + << table_properties.orig_file_number << "seqno_to_time_mapping"; + + if (table_properties.seqno_to_time_mapping.empty()) { + jwriter << "N/A"; + } else { + SeqnoToTimeMapping tmp; + Status status = tmp.Add(table_properties.seqno_to_time_mapping); + if (status.ok()) { + jwriter << tmp.ToHumanString(); + } else { + jwriter << "Invalid"; + } + } + + // user collected properties + for (const auto& prop : table_properties.readable_properties) { + jwriter << prop.first << prop.second; + } + jwriter.EndObject(); + } + + if (oldest_blob_file_number != kInvalidBlobFileNumber) { + jwriter << "oldest_blob_file_number" << oldest_blob_file_number; + } + + jwriter.EndObject(); + + event_logger->Log(jwriter); + } + +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + TableFileCreationInfo info; + info.db_name = db_name; + info.cf_name = cf_name; + info.file_path = file_path; + info.file_size = fd.file_size; + info.job_id = job_id; + info.table_properties = table_properties; + info.reason = reason; + info.status = s; + info.file_checksum = file_checksum; + info.file_checksum_func_name = file_checksum_func_name; + for (auto& listener : listeners) { + listener->OnTableFileCreated(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)db_name; + (void)cf_name; + (void)file_path; + (void)reason; +#endif // !ROCKSDB_LITE +} + +void EventHelpers::LogAndNotifyTableFileDeletion( + EventLogger* event_logger, int job_id, uint64_t file_number, + const std::string& file_path, const Status& status, + const std::string& dbname, + const std::vector<std::shared_ptr<EventListener>>& listeners) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + + jwriter << "job" << job_id << "event" + << "table_file_deletion" + << "file_number" << file_number; + if (!status.ok()) { + jwriter << "status" << status.ToString(); + } + + jwriter.EndObject(); + + event_logger->Log(jwriter); + +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + TableFileDeletionInfo info; + info.db_name = dbname; + info.job_id = job_id; + info.file_path = file_path; + info.status = status; + for (auto& listener : listeners) { + listener->OnTableFileDeleted(info); + } + info.status.PermitUncheckedError(); +#else + (void)file_path; + (void)dbname; + (void)listeners; +#endif // !ROCKSDB_LITE +} + +void EventHelpers::NotifyOnErrorRecoveryEnd( + const std::vector<std::shared_ptr<EventListener>>& listeners, + const Status& old_bg_error, const Status& new_bg_error, + InstrumentedMutex* db_mutex) { +#ifndef ROCKSDB_LITE + if (!listeners.empty()) { + db_mutex->AssertHeld(); + // release lock while notifying events + db_mutex->Unlock(); + for (auto& listener : listeners) { + BackgroundErrorRecoveryInfo info; + info.old_bg_error = old_bg_error; + info.new_bg_error = new_bg_error; + listener->OnErrorRecoveryCompleted(old_bg_error); + listener->OnErrorRecoveryEnd(info); + info.old_bg_error.PermitUncheckedError(); + info.new_bg_error.PermitUncheckedError(); + } + db_mutex->Lock(); + } +#else + (void)listeners; + (void)old_bg_error; + (void)new_bg_error; + (void)db_mutex; +#endif // ROCKSDB_LITE +} + +#ifndef ROCKSDB_LITE +void EventHelpers::NotifyBlobFileCreationStarted( + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, + BlobFileCreationReason creation_reason) { + if (listeners.empty()) { + return; + } + BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id, + creation_reason); + for (const auto& listener : listeners) { + listener->OnBlobFileCreationStarted(info); + } +} +#endif // !ROCKSDB_LITE + +void EventHelpers::LogAndNotifyBlobFileCreationFinished( + EventLogger* event_logger, + const std::vector<std::shared_ptr<EventListener>>& listeners, + const std::string& db_name, const std::string& cf_name, + const std::string& file_path, int job_id, uint64_t file_number, + BlobFileCreationReason creation_reason, const Status& s, + const std::string& file_checksum, + const std::string& file_checksum_func_name, uint64_t total_blob_count, + uint64_t total_blob_bytes) { + if (s.ok() && event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + jwriter << "cf_name" << cf_name << "job" << job_id << "event" + << "blob_file_creation" + << "file_number" << file_number << "total_blob_count" + << total_blob_count << "total_blob_bytes" << total_blob_bytes + << "file_checksum" << file_checksum << "file_checksum_func_name" + << file_checksum_func_name << "status" << s.ToString(); + + jwriter.EndObject(); + event_logger->Log(jwriter); + } + +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + BlobFileCreationInfo info(db_name, cf_name, file_path, job_id, + creation_reason, total_blob_count, total_blob_bytes, + s, file_checksum, file_checksum_func_name); + for (const auto& listener : listeners) { + listener->OnBlobFileCreated(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)db_name; + (void)file_path; + (void)creation_reason; +#endif +} + +void EventHelpers::LogAndNotifyBlobFileDeletion( + EventLogger* event_logger, + const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id, + uint64_t file_number, const std::string& file_path, const Status& status, + const std::string& dbname) { + if (event_logger) { + JSONWriter jwriter; + AppendCurrentTime(&jwriter); + + jwriter << "job" << job_id << "event" + << "blob_file_deletion" + << "file_number" << file_number; + if (!status.ok()) { + jwriter << "status" << status.ToString(); + } + + jwriter.EndObject(); + event_logger->Log(jwriter); + } +#ifndef ROCKSDB_LITE + if (listeners.empty()) { + return; + } + BlobFileDeletionInfo info(dbname, file_path, job_id, status); + for (const auto& listener : listeners) { + listener->OnBlobFileDeleted(info); + } + info.status.PermitUncheckedError(); +#else + (void)listeners; + (void)dbname; + (void)file_path; +#endif // !ROCKSDB_LITE +} + +} // namespace ROCKSDB_NAMESPACE |