From c21c3b0befeb46a51b6bf3758ffa30813bea0ff0 Mon Sep 17 00:00:00 2001 From: Daniel Baumann Date: Sat, 9 Mar 2024 14:19:22 +0100 Subject: Adding upstream version 1.44.3. Signed-off-by: Daniel Baumann --- .../systemd-journal.plugin/systemd-journal.c | 2168 +++++++------------- 1 file changed, 758 insertions(+), 1410 deletions(-) (limited to 'collectors/systemd-journal.plugin/systemd-journal.c') diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 877371120..f812b2161 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -5,13 +5,7 @@ * GPL v3+ */ -#include "collectors/all.h" -#include "libnetdata/libnetdata.h" -#include "libnetdata/required_dummies.h" - -#include -#include -#include +#include "systemd-internals.h" /* * TODO @@ -20,95 +14,17 @@ * */ - -// ---------------------------------------------------------------------------- -// fstat64 overloading to speed up libsystemd -// https://github.com/systemd/systemd/pull/29261 - -#define ND_SD_JOURNAL_OPEN_FLAGS (0) - -#include -#include - -#define FSTAT_CACHE_MAX 1024 -struct fdstat64_cache_entry { - bool enabled; - bool updated; - int err_no; - struct stat64 stat; - int ret; - size_t cached_count; - size_t session; -}; - -struct fdstat64_cache_entry fstat64_cache[FSTAT_CACHE_MAX] = {0 }; -static __thread size_t fstat_thread_calls = 0; -static __thread size_t fstat_thread_cached_responses = 0; -static __thread bool enable_thread_fstat = false; -static __thread size_t fstat_caching_thread_session = 0; -static size_t fstat_caching_global_session = 0; - -static void fstat_cache_enable_on_thread(void) { - fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_ACQUIRE); - enable_thread_fstat = true; -} - -static void fstat_cache_disable_on_thread(void) { - fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_RELEASE); - enable_thread_fstat = false; -} - -int fstat64(int fd, struct stat64 *buf) { - static int (*real_fstat)(int, struct stat64 *) = NULL; - if (!real_fstat) - real_fstat = dlsym(RTLD_NEXT, "fstat64"); - - fstat_thread_calls++; - - if(fd >= 0 && fd < FSTAT_CACHE_MAX) { - if(enable_thread_fstat && fstat64_cache[fd].session != fstat_caching_thread_session) { - fstat64_cache[fd].session = fstat_caching_thread_session; - fstat64_cache[fd].enabled = true; - fstat64_cache[fd].updated = false; - } - - if(fstat64_cache[fd].enabled && fstat64_cache[fd].updated && fstat64_cache[fd].session == fstat_caching_thread_session) { - fstat_thread_cached_responses++; - errno = fstat64_cache[fd].err_no; - *buf = fstat64_cache[fd].stat; - fstat64_cache[fd].cached_count++; - return fstat64_cache[fd].ret; - } - } - - int ret = real_fstat(fd, buf); - - if(fd >= 0 && fd < FSTAT_CACHE_MAX && fstat64_cache[fd].enabled) { - fstat64_cache[fd].ret = ret; - fstat64_cache[fd].updated = true; - fstat64_cache[fd].err_no = errno; - fstat64_cache[fd].stat = *buf; - fstat64_cache[fd].session = fstat_caching_thread_session; - } - - return ret; -} - -// ---------------------------------------------------------------------------- - #define FACET_MAX_VALUE_LENGTH 8192 -#define SYSTEMD_JOURNAL_MAX_SOURCE_LEN 64 #define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries." #define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal" #define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60 -#define SYSTEMD_JOURNAL_MAX_PARAMS 100 +#define SYSTEMD_JOURNAL_MAX_PARAMS 1000 #define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600) #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200 -#define SYSTEMD_JOURNAL_WORKER_THREADS 5 - -#define JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT (5 * USEC_PER_SEC) // assume always 5 seconds latency -#define JOURNAL_VS_REALTIME_DELTA_MAX_UT (2 * 60 * USEC_PER_SEC) // up to 2 minutes latency +#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING 1000000 +#define SYSTEMD_JOURNAL_SAMPLING_SLOTS 1000 +#define SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE 10000 #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" @@ -128,6 +44,7 @@ int fstat64(int fd, struct stat64 *buf) { #define JOURNAL_PARAMETER_SLICE "slice" #define JOURNAL_PARAMETER_DELTA "delta" #define JOURNAL_PARAMETER_TAIL "tail" +#define JOURNAL_PARAMETER_SAMPLING "sampling" #define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE" #define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS" @@ -138,7 +55,8 @@ int fstat64(int fd, struct stat64 *buf) { #define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL #define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \ - "*MESSAGE*" \ + "!MESSAGE_ID" \ + "|*MESSAGE*" \ "|*_RAW" \ "|*_USEC" \ "|*_NSEC" \ @@ -153,7 +71,7 @@ int fstat64(int fd, struct stat64 *buf) { /* --- USER JOURNAL FIELDS --- */ \ \ /* "|MESSAGE" */ \ - /* "|MESSAGE_ID" */ \ + "|MESSAGE_ID" \ "|PRIORITY" \ "|CODE_FILE" \ /* "|CODE_LINE" */ \ @@ -247,33 +165,22 @@ int fstat64(int fd, struct stat64 *buf) { "|IMAGE_NAME" /* undocumented */ \ /* "|CONTAINER_PARTIAL_MESSAGE" */ \ \ + \ + /* --- NETDATA --- */ \ + \ + "|ND_NIDL_NODE" \ + "|ND_NIDL_CONTEXT" \ + "|ND_LOG_SOURCE" \ + /*"|ND_MODULE" */ \ + "|ND_ALERT_NAME" \ + "|ND_ALERT_CLASS" \ + "|ND_ALERT_COMPONENT" \ + "|ND_ALERT_TYPE" \ + \ "" -static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; -static bool plugin_should_exit = false; - // ---------------------------------------------------------------------------- -typedef enum { - ND_SD_JOURNAL_NO_FILE_MATCHED, - ND_SD_JOURNAL_FAILED_TO_OPEN, - ND_SD_JOURNAL_FAILED_TO_SEEK, - ND_SD_JOURNAL_TIMED_OUT, - ND_SD_JOURNAL_OK, - ND_SD_JOURNAL_NOT_MODIFIED, - ND_SD_JOURNAL_CANCELLED, -} ND_SD_JOURNAL_STATUS; - -typedef enum { - SDJF_ALL = 0, - SDJF_LOCAL = (1 << 0), - SDJF_REMOTE = (1 << 1), - SDJF_SYSTEM = (1 << 2), - SDJF_USER = (1 << 3), - SDJF_NAMESPACE = (1 << 4), - SDJF_OTHER = (1 << 5), -} SD_JOURNAL_FILE_SOURCE_TYPE; - typedef struct function_query_status { bool *cancelled; // a pointer to the cancelling boolean usec_t stop_monotonic_ut; @@ -282,7 +189,7 @@ typedef struct function_query_status { // request SD_JOURNAL_FILE_SOURCE_TYPE source_type; - STRING *source; + SIMPLE_PATTERN *sources; usec_t after_ut; usec_t before_ut; @@ -298,13 +205,50 @@ typedef struct function_query_status { bool tail; bool data_only; bool slice; + size_t sampling; size_t filters; usec_t last_modified; const char *query; const char *histogram; + struct { + usec_t start_ut; // the starting time of the query - we start from this + usec_t stop_ut; // the ending time of the query - we stop at this + usec_t first_msg_ut; + + sd_id128_t first_msg_writer; + uint64_t first_msg_seqnum; + } query_file; + + struct { + uint32_t enable_after_samples; + uint32_t slots; + uint32_t sampled; + uint32_t unsampled; + uint32_t estimated; + } samples; + + struct { + uint32_t enable_after_samples; + uint32_t every; + uint32_t skipped; + uint32_t recalibrate; + uint32_t sampled; + uint32_t unsampled; + uint32_t estimated; + } samples_per_file; + + struct { + usec_t start_ut; + usec_t end_ut; + usec_t step_ut; + uint32_t enable_after_samples; + uint32_t sampled[SYSTEMD_JOURNAL_SAMPLING_SLOTS]; + uint32_t unsampled[SYSTEMD_JOURNAL_SAMPLING_SLOTS]; + } samples_per_time_slot; + // per file progress info - size_t cached_count; + // size_t cached_count; // progress statistics usec_t matches_setup_ut; @@ -315,20 +259,6 @@ typedef struct function_query_status { size_t file_working; } FUNCTION_QUERY_STATUS; -struct journal_file { - const char *filename; - size_t filename_len; - STRING *source; - SD_JOURNAL_FILE_SOURCE_TYPE source_type; - usec_t file_last_modified_ut; - usec_t msg_first_ut; - usec_t msg_last_ut; - usec_t last_scan_ut; - size_t size; - bool logged_failure; - usec_t max_journal_vs_realtime_delta_ut; -}; - static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) { netdata_log_error("ERROR: %s, on query " "timeframe [%"PRIu64" - %"PRIu64"], " @@ -359,25 +289,369 @@ static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timesta #define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP" -static inline bool parse_journal_field(const char *data, size_t data_length, const char **key, size_t *key_length, const char **value, size_t *value_length) { - const char *k = data; - const char *equal = strchr(k, '='); - if(unlikely(!equal)) - return false; +// ---------------------------------------------------------------------------- +// sampling support - size_t kl = equal - k; +static void sampling_query_init(FUNCTION_QUERY_STATUS *fqs, FACETS *facets) { + if(!fqs->sampling) + return; - const char *v = ++equal; - size_t vl = data_length - kl - 1; + if(!fqs->slice) { + // the user is doing a full data query + // disable sampling + fqs->sampling = 0; + return; + } - *key = k; - *key_length = kl; - *value = v; - *value_length = vl; + if(fqs->data_only) { + // the user is doing a data query + // disable sampling + fqs->sampling = 0; + return; + } - return true; + if(!fqs->files_matched) { + // no files have been matched + // disable sampling + fqs->sampling = 0; + return; + } + + fqs->samples.slots = facets_histogram_slots(facets); + if(fqs->samples.slots < 2) fqs->samples.slots = 2; + if(fqs->samples.slots > SYSTEMD_JOURNAL_SAMPLING_SLOTS) + fqs->samples.slots = SYSTEMD_JOURNAL_SAMPLING_SLOTS; + + if(!fqs->after_ut || !fqs->before_ut || fqs->after_ut >= fqs->before_ut) { + // we don't have enough information for sampling + fqs->sampling = 0; + return; + } + + usec_t delta = fqs->before_ut - fqs->after_ut; + usec_t step = delta / facets_histogram_slots(facets) - 1; + if(step < 1) step = 1; + + fqs->samples_per_time_slot.start_ut = fqs->after_ut; + fqs->samples_per_time_slot.end_ut = fqs->before_ut; + fqs->samples_per_time_slot.step_ut = step; + + // the minimum number of rows to enable sampling + fqs->samples.enable_after_samples = fqs->sampling / 2; + + size_t files_matched = fqs->files_matched; + if(!files_matched) + files_matched = 1; + + // the minimum number of rows per file to enable sampling + fqs->samples_per_file.enable_after_samples = (fqs->sampling / 4) / files_matched; + if(fqs->samples_per_file.enable_after_samples < fqs->entries) + fqs->samples_per_file.enable_after_samples = fqs->entries; + + // the minimum number of rows per time slot to enable sampling + fqs->samples_per_time_slot.enable_after_samples = (fqs->sampling / 4) / fqs->samples.slots; + if(fqs->samples_per_time_slot.enable_after_samples < fqs->entries) + fqs->samples_per_time_slot.enable_after_samples = fqs->entries; } +static void sampling_file_init(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf __maybe_unused) { + fqs->samples_per_file.sampled = 0; + fqs->samples_per_file.unsampled = 0; + fqs->samples_per_file.estimated = 0; + fqs->samples_per_file.every = 0; + fqs->samples_per_file.skipped = 0; + fqs->samples_per_file.recalibrate = 0; +} + +static size_t sampling_file_lines_scanned_so_far(FUNCTION_QUERY_STATUS *fqs) { + size_t sampled = fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled; + if(!sampled) sampled = 1; + return sampled; +} + +static void sampling_running_file_query_overlapping_timeframe_ut( + FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, + usec_t msg_ut, usec_t *after_ut, usec_t *before_ut) { + + // find the overlap of the query and file timeframes + // taking into account the first message we encountered + + usec_t oldest_ut, newest_ut; + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) { + // the first message we know (oldest) + oldest_ut = fqs->query_file.first_msg_ut ? fqs->query_file.first_msg_ut : jf->msg_first_ut; + if(!oldest_ut) oldest_ut = fqs->query_file.start_ut; + + if(jf->msg_last_ut) + newest_ut = MIN(fqs->query_file.stop_ut, jf->msg_last_ut); + else if(jf->file_last_modified_ut) + newest_ut = MIN(fqs->query_file.stop_ut, jf->file_last_modified_ut); + else + newest_ut = fqs->query_file.stop_ut; + + if(msg_ut < oldest_ut) + oldest_ut = msg_ut - 1; + } + else /* BACKWARD */ { + // the latest message we know (newest) + newest_ut = fqs->query_file.first_msg_ut ? fqs->query_file.first_msg_ut : jf->msg_last_ut; + if(!newest_ut) newest_ut = fqs->query_file.start_ut; + + if(jf->msg_first_ut) + oldest_ut = MAX(fqs->query_file.stop_ut, jf->msg_first_ut); + else + oldest_ut = fqs->query_file.stop_ut; + + if(newest_ut < msg_ut) + newest_ut = msg_ut + 1; + } + + *after_ut = oldest_ut; + *before_ut = newest_ut; +} + +static double sampling_running_file_query_progress_by_time(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { + + usec_t after_ut, before_ut, elapsed_ut; + sampling_running_file_query_overlapping_timeframe_ut(fqs, jf, direction, msg_ut, &after_ut, &before_ut); + + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) + elapsed_ut = msg_ut - after_ut; + else + elapsed_ut = before_ut - msg_ut; + + usec_t total_ut = before_ut - after_ut; + double progress = (double)elapsed_ut / (double)total_ut; + + return progress; +} + +static usec_t sampling_running_file_query_remaining_time(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut, + usec_t *total_time_ut, usec_t *remaining_start_ut, + usec_t *remaining_end_ut) { + usec_t after_ut, before_ut; + sampling_running_file_query_overlapping_timeframe_ut(fqs, jf, direction, msg_ut, &after_ut, &before_ut); + + // since we have a timestamp in msg_ut + // this timestamp can extend the overlap + if(msg_ut <= after_ut) + after_ut = msg_ut - 1; + + if(msg_ut >= before_ut) + before_ut = msg_ut + 1; + + // return the remaining duration + usec_t remaining_from_ut, remaining_to_ut; + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) { + remaining_from_ut = msg_ut; + remaining_to_ut = before_ut; + } + else { + remaining_from_ut = after_ut; + remaining_to_ut = msg_ut; + } + + usec_t remaining_ut = remaining_to_ut - remaining_from_ut; + + if(total_time_ut) + *total_time_ut = (before_ut > after_ut) ? before_ut - after_ut : 1; + + if(remaining_start_ut) + *remaining_start_ut = remaining_from_ut; + + if(remaining_end_ut) + *remaining_end_ut = remaining_to_ut; + + return remaining_ut; +} + +static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCTION_QUERY_STATUS *fqs, + struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, + usec_t msg_ut) { + size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs); + + // Calculate the proportion of time covered + usec_t total_time_ut, remaining_start_ut, remaining_end_ut; + usec_t remaining_time_ut = sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut, + &remaining_start_ut, &remaining_end_ut); + if (total_time_ut == 0) total_time_ut = 1; + + double proportion_by_time = (double) (total_time_ut - remaining_time_ut) / (double) total_time_ut; + + if (proportion_by_time == 0 || proportion_by_time > 1.0 || !isfinite(proportion_by_time)) + proportion_by_time = 1.0; + + // Estimate the total number of lines in the file + size_t expected_matching_logs_by_time = (size_t)((double)scanned_lines / proportion_by_time); + + if(jf->messages_in_file && expected_matching_logs_by_time > jf->messages_in_file) + expected_matching_logs_by_time = jf->messages_in_file; + + // Calculate the estimated number of remaining lines + size_t remaining_logs_by_time = expected_matching_logs_by_time - scanned_lines; + if (remaining_logs_by_time < 1) remaining_logs_by_time = 1; + +// nd_log(NDLS_COLLECTORS, NDLP_INFO, +// "JOURNAL ESTIMATION: '%s' " +// "scanned_lines=%zu [sampled=%zu, unsampled=%zu, estimated=%zu], " +// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], " +// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], " +// "first message read from the file at %"PRIu64", current message at %"PRIu64", " +// "proportion of time %.2f %%, " +// "expected total lines in file %zu, " +// "remaining lines %zu, " +// "remaining time %"PRIu64" [%"PRIu64" - %"PRIu64", duration %"PRId64"]" +// , jf->filename +// , scanned_lines, fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated +// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file +// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut +// , fqs->query_file.first_msg_ut, msg_ut +// , proportion_by_time * 100.0 +// , expected_matching_logs_by_time +// , remaining_logs_by_time +// , remaining_time_ut, remaining_start_ut, remaining_end_ut, remaining_end_ut - remaining_start_ut +// ); + + return remaining_logs_by_time; +} + +static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { + size_t expected_matching_logs_by_seqnum = 0; + double proportion_by_seqnum = 0.0; + size_t remaining_logs_by_seqnum = 0; + +#ifdef HAVE_SD_JOURNAL_GET_SEQNUM + uint64_t current_msg_seqnum; + sd_id128_t current_msg_writer; + if(!fqs->query_file.first_msg_seqnum || sd_journal_get_seqnum(j, ¤t_msg_seqnum, ¤t_msg_writer) < 0) { + fqs->query_file.first_msg_seqnum = 0; + fqs->query_file.first_msg_writer = SD_ID128_NULL; + } + else if(jf->messages_in_file) { + size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs); + + double proportion_of_all_lines_so_far; + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) + proportion_of_all_lines_so_far = (double)scanned_lines / (double)(current_msg_seqnum - jf->first_seqnum); + else + proportion_of_all_lines_so_far = (double)scanned_lines / (double)(jf->last_seqnum - current_msg_seqnum); + + if(proportion_of_all_lines_so_far > 1.0) + proportion_of_all_lines_so_far = 1.0; + + expected_matching_logs_by_seqnum = (size_t)(proportion_of_all_lines_so_far * (double)jf->messages_in_file); + + proportion_by_seqnum = (double)scanned_lines / (double)expected_matching_logs_by_seqnum; + + if (proportion_by_seqnum == 0 || proportion_by_seqnum > 1.0 || !isfinite(proportion_by_seqnum)) + proportion_by_seqnum = 1.0; + + remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines; + if(!remaining_logs_by_seqnum) remaining_logs_by_seqnum = 1; + } +#endif + + if(remaining_logs_by_seqnum) + return remaining_logs_by_seqnum; + + return sampling_running_file_query_estimate_remaining_lines_by_time(fqs, jf, direction, msg_ut); +} + +static void sampling_decide_file_sampling_every(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { + size_t files_matched = fqs->files_matched; + if(!files_matched) files_matched = 1; + + size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, fqs, jf, direction, msg_ut); + size_t wanted_samples = (fqs->sampling / 2) / files_matched; + if(!wanted_samples) wanted_samples = 1; + + fqs->samples_per_file.every = remaining_lines / wanted_samples; + + if(fqs->samples_per_file.every < 1) + fqs->samples_per_file.every = 1; +} + +typedef enum { + SAMPLING_STOP_AND_ESTIMATE = -1, + SAMPLING_FULL = 0, + SAMPLING_SKIP_FIELDS = 1, +} sampling_t; + +static inline sampling_t is_row_in_sample(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction, bool candidate_to_keep) { + if(!fqs->sampling || candidate_to_keep) + return SAMPLING_FULL; + + if(unlikely(msg_ut < fqs->samples_per_time_slot.start_ut)) + msg_ut = fqs->samples_per_time_slot.start_ut; + if(unlikely(msg_ut > fqs->samples_per_time_slot.end_ut)) + msg_ut = fqs->samples_per_time_slot.end_ut; + + size_t slot = (msg_ut - fqs->samples_per_time_slot.start_ut) / fqs->samples_per_time_slot.step_ut; + if(slot >= fqs->samples.slots) + slot = fqs->samples.slots - 1; + + bool should_sample = false; + + if(fqs->samples.sampled < fqs->samples.enable_after_samples || + fqs->samples_per_file.sampled < fqs->samples_per_file.enable_after_samples || + fqs->samples_per_time_slot.sampled[slot] < fqs->samples_per_time_slot.enable_after_samples) + should_sample = true; + + else if(fqs->samples_per_file.recalibrate >= SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE || !fqs->samples_per_file.every) { + // this is the first to be unsampled for this file + sampling_decide_file_sampling_every(j, fqs, jf, direction, msg_ut); + fqs->samples_per_file.recalibrate = 0; + should_sample = true; + } + else { + // we sample 1 every fqs->samples_per_file.every + if(fqs->samples_per_file.skipped >= fqs->samples_per_file.every) { + fqs->samples_per_file.skipped = 0; + should_sample = true; + } + else + fqs->samples_per_file.skipped++; + } + + if(should_sample) { + fqs->samples.sampled++; + fqs->samples_per_file.sampled++; + fqs->samples_per_time_slot.sampled[slot]++; + + return SAMPLING_FULL; + } + + fqs->samples_per_file.recalibrate++; + + fqs->samples.unsampled++; + fqs->samples_per_file.unsampled++; + fqs->samples_per_time_slot.unsampled[slot]++; + + if(fqs->samples_per_file.unsampled > fqs->samples_per_file.sampled) { + double progress_by_time = sampling_running_file_query_progress_by_time(fqs, jf, direction, msg_ut); + + if(progress_by_time > SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE) + return SAMPLING_STOP_AND_ESTIMATE; + } + + return SAMPLING_SKIP_FIELDS; +} + +static void sampling_update_running_query_file_estimates(FACETS *facets, sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction) { + usec_t total_time_ut, remaining_start_ut, remaining_end_ut; + sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut, + &remaining_end_ut); + size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, fqs, jf, direction, msg_ut); + facets_update_estimations(facets, remaining_start_ut, remaining_end_ut, remaining_lines); + fqs->samples.estimated += remaining_lines; + fqs->samples_per_file.estimated += remaining_lines; +} + +// ---------------------------------------------------------------------------- + static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) { const void *data; size_t length, bytes = 0; @@ -454,11 +728,15 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward( usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut; bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut); + fqs->query_file.start_ut = start_ut; + fqs->query_file.stop_ut = stop_ut; + if(!netdata_systemd_journal_seek_to(j, start_ut)) return ND_SD_JOURNAL_FAILED_TO_SEEK; size_t errors_no_timestamp = 0; - usec_t earliest_msg_ut = 0; + usec_t latest_msg_ut = 0; // the biggest timestamp we have seen so far + usec_t first_msg_ut = 0; // the first message we got from the db size_t row_counter = 0, last_row_counter = 0, rows_useful = 0; size_t bytes = 0, last_bytes = 0; @@ -475,44 +753,68 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward( continue; } - if(unlikely(msg_ut > earliest_msg_ut)) - earliest_msg_ut = msg_ut; - if (unlikely(msg_ut > start_ut)) continue; if (unlikely(msg_ut < stop_ut)) break; - bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + if(unlikely(msg_ut > latest_msg_ut)) + latest_msg_ut = msg_ut; - // make sure each line gets a unique timestamp - if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) - msg_ut = --last_usec_from; - else - last_usec_from = last_usec_to = msg_ut; - - if(facets_row_finished(facets, msg_ut)) - rows_useful++; - - row_counter++; - if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && - stop_when_full && - facets_rows(facets) >= fqs->entries)) { - // stop the data only query - usec_t oldest = facets_row_oldest_ut(facets); - if(oldest && msg_ut < (oldest - anchor_delta)) - break; + if(unlikely(!first_msg_ut)) { + first_msg_ut = msg_ut; + fqs->query_file.first_msg_ut = msg_ut; + +#ifdef HAVE_SD_JOURNAL_GET_SEQNUM + if(sd_journal_get_seqnum(j, &fqs->query_file.first_msg_seqnum, &fqs->query_file.first_msg_writer) < 0) { + fqs->query_file.first_msg_seqnum = 0; + fqs->query_file.first_msg_writer = SD_ID128_NULL; + } +#endif } - if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { - FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); - last_row_counter = row_counter; + sampling_t sample = is_row_in_sample(j, fqs, jf, msg_ut, + FACETS_ANCHOR_DIRECTION_BACKWARD, + facets_row_candidate_to_keep(facets, msg_ut)); + + if(sample == SAMPLING_FULL) { + bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + + // make sure each line gets a unique timestamp + if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) + msg_ut = --last_usec_from; + else + last_usec_from = last_usec_to = msg_ut; + + if(facets_row_finished(facets, msg_ut)) + rows_useful++; + + row_counter++; + if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && + stop_when_full && + facets_rows(facets) >= fqs->entries)) { + // stop the data only query + usec_t oldest = facets_row_oldest_ut(facets); + if(oldest && msg_ut < (oldest - anchor_delta)) + break; + } + + if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + last_row_counter = row_counter; - FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); - last_bytes = bytes; + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + last_bytes = bytes; - status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + } + } + else if(sample == SAMPLING_SKIP_FIELDS) + facets_row_finished_unsampled(facets, msg_ut); + else { + sampling_update_running_query_file_estimates(facets, j, fqs, jf, msg_ut, FACETS_ANCHOR_DIRECTION_BACKWARD); + break; } } @@ -524,8 +826,8 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward( if(errors_no_timestamp) netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); - if(earliest_msg_ut > fqs->last_modified) - fqs->last_modified = earliest_msg_ut; + if(latest_msg_ut > fqs->last_modified) + fqs->last_modified = latest_msg_ut; return status; } @@ -540,11 +842,15 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward( usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta; bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut); + fqs->query_file.start_ut = start_ut; + fqs->query_file.stop_ut = stop_ut; + if(!netdata_systemd_journal_seek_to(j, start_ut)) return ND_SD_JOURNAL_FAILED_TO_SEEK; size_t errors_no_timestamp = 0; - usec_t earliest_msg_ut = 0; + usec_t latest_msg_ut = 0; // the biggest timestamp we have seen so far + usec_t first_msg_ut = 0; // the first message we got from the db size_t row_counter = 0, last_row_counter = 0, rows_useful = 0; size_t bytes = 0, last_bytes = 0; @@ -561,44 +867,61 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward( continue; } - if(likely(msg_ut > earliest_msg_ut)) - earliest_msg_ut = msg_ut; - if (unlikely(msg_ut < start_ut)) continue; if (unlikely(msg_ut > stop_ut)) break; - bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + if(likely(msg_ut > latest_msg_ut)) + latest_msg_ut = msg_ut; - // make sure each line gets a unique timestamp - if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) - msg_ut = ++last_usec_to; - else - last_usec_from = last_usec_to = msg_ut; - - if(facets_row_finished(facets, msg_ut)) - rows_useful++; - - row_counter++; - if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && - stop_when_full && - facets_rows(facets) >= fqs->entries)) { - // stop the data only query - usec_t newest = facets_row_newest_ut(facets); - if(newest && msg_ut > (newest + anchor_delta)) - break; + if(unlikely(!first_msg_ut)) { + first_msg_ut = msg_ut; + fqs->query_file.first_msg_ut = msg_ut; } - if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { - FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); - last_row_counter = row_counter; + sampling_t sample = is_row_in_sample(j, fqs, jf, msg_ut, + FACETS_ANCHOR_DIRECTION_FORWARD, + facets_row_candidate_to_keep(facets, msg_ut)); + + if(sample == SAMPLING_FULL) { + bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + + // make sure each line gets a unique timestamp + if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) + msg_ut = ++last_usec_to; + else + last_usec_from = last_usec_to = msg_ut; + + if(facets_row_finished(facets, msg_ut)) + rows_useful++; + + row_counter++; + if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && + stop_when_full && + facets_rows(facets) >= fqs->entries)) { + // stop the data only query + usec_t newest = facets_row_newest_ut(facets); + if(newest && msg_ut > (newest + anchor_delta)) + break; + } - FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); - last_bytes = bytes; + if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + last_row_counter = row_counter; - status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + last_bytes = bytes; + + status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + } + } + else if(sample == SAMPLING_SKIP_FIELDS) + facets_row_finished_unsampled(facets, msg_ut); + else { + sampling_update_running_query_file_estimates(facets, j, fqs, jf, msg_ut, FACETS_ANCHOR_DIRECTION_FORWARD); + break; } } @@ -610,8 +933,8 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward( if(errors_no_timestamp) netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); - if(earliest_msg_ut > fqs->last_modified) - fqs->last_modified = earliest_msg_ut; + if(latest_msg_ut > fqs->last_modified) + fqs->last_modified = latest_msg_ut; return status; } @@ -723,6 +1046,7 @@ static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file( }; if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) { + netdata_log_error("JOURNAL: cannot open file '%s' for query", filename); fstat_cache_disable_on_thread(); return ND_SD_JOURNAL_FAILED_TO_OPEN; } @@ -756,463 +1080,25 @@ static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file( return status; } -// ---------------------------------------------------------------------------- -// journal files registry - -#define VAR_LOG_JOURNAL_MAX_DEPTH 10 -#define MAX_JOURNAL_DIRECTORIES 100 - -struct journal_directory { - char *path; - bool logged_failure; -}; - -static struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES] = { 0 }; -static DICTIONARY *journal_files_registry = NULL; -static DICTIONARY *used_hashes_registry = NULL; - -static usec_t systemd_journal_session = 0; - -static void buffer_json_journal_versions(BUFFER *wb) { - buffer_json_member_add_object(wb, "versions"); - { - buffer_json_member_add_uint64(wb, "sources", - systemd_journal_session + dictionary_version(journal_files_registry)); - } - buffer_json_object_close(wb); -} - -static void journal_file_update_msg_ut(const char *filename, struct journal_file *jf) { - fstat_cache_enable_on_thread(); - - const char *files[2] = { - [0] = filename, - [1] = NULL, - }; - - sd_journal *j = NULL; - if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) { - fstat_cache_disable_on_thread(); - - if(!jf->logged_failure) { - netdata_log_error("cannot open journal file '%s', using file timestamps to understand time-frame.", filename); - jf->logged_failure = true; - } - - jf->msg_first_ut = 0; - jf->msg_last_ut = jf->file_last_modified_ut; - return; - } - - usec_t first_ut = 0, last_ut = 0; - - if(sd_journal_seek_head(j) < 0 || sd_journal_next(j) < 0 || sd_journal_get_realtime_usec(j, &first_ut) < 0 || !first_ut) { - internal_error(true, "cannot find the timestamp of the first message in '%s'", filename); - first_ut = 0; - } - - if(sd_journal_seek_tail(j) < 0 || sd_journal_previous(j) < 0 || sd_journal_get_realtime_usec(j, &last_ut) < 0 || !last_ut) { - internal_error(true, "cannot find the timestamp of the last message in '%s'", filename); - last_ut = jf->file_last_modified_ut; - } - - sd_journal_close(j); - fstat_cache_disable_on_thread(); - - if(first_ut > last_ut) { - internal_error(true, "timestamps are flipped in file '%s'", filename); - usec_t t = first_ut; - first_ut = last_ut; - last_ut = t; - } - - jf->msg_first_ut = first_ut; - jf->msg_last_ut = last_ut; -} - -static STRING *string_strdupz_source(const char *s, const char *e, size_t max_len, const char *prefix) { - char buf[max_len]; - size_t len; - char *dst = buf; - - if(prefix) { - len = strlen(prefix); - memcpy(buf, prefix, len); - dst = &buf[len]; - max_len -= len; - } - - len = e - s; - if(len >= max_len) - len = max_len - 1; - memcpy(dst, s, len); - dst[len] = '\0'; - buf[max_len - 1] = '\0'; - - for(size_t i = 0; buf[i] ;i++) - if(!isalnum(buf[i]) && buf[i] != '-' && buf[i] != '.' && buf[i] != ':') - buf[i] = '_'; - - return string_strdupz(buf); -} - -static void files_registry_insert_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) { - struct journal_file *jf = value; - jf->filename = dictionary_acquired_item_name(item); - jf->filename_len = strlen(jf->filename); - - // based on the filename - // decide the source to show to the user - const char *s = strrchr(jf->filename, '/'); - if(s) { - if(strstr(jf->filename, "/remote/")) - jf->source_type = SDJF_REMOTE; - else { - const char *t = s - 1; - while(t >= jf->filename && *t != '.' && *t != '/') - t--; - - if(t >= jf->filename && *t == '.') { - jf->source_type = SDJF_NAMESPACE; - jf->source = string_strdupz_source(t + 1, s, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "namespace-"); - } - else - jf->source_type = SDJF_LOCAL; - } - - if(strncmp(s, "/system", 7) == 0) - jf->source_type |= SDJF_SYSTEM; - - else if(strncmp(s, "/user", 5) == 0) - jf->source_type |= SDJF_USER; - - else if(strncmp(s, "/remote-", 8) == 0) { - jf->source_type |= SDJF_REMOTE; - - s = &s[8]; // skip "/remote-" - - char *e = strchr(s, '@'); - if(!e) - e = strstr(s, ".journal"); - - if(e) { - const char *d = s; - for(; d < e && (isdigit(*d) || *d == '.' || *d == ':') ; d++) ; - if(d == e) { - // a valid IP address - char ip[e - s + 1]; - memcpy(ip, s, e - s); - ip[e - s] = '\0'; - char buf[SYSTEMD_JOURNAL_MAX_SOURCE_LEN]; - if(ip_to_hostname(ip, buf, sizeof(buf))) - jf->source = string_strdupz_source(buf, &buf[strlen(buf)], SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); - else { - internal_error(true, "Cannot find the hostname for IP '%s'", ip); - jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); - } - } - else - jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); - } - else - jf->source_type |= SDJF_OTHER; - } - else - jf->source_type |= SDJF_OTHER; - } - else - jf->source_type = SDJF_LOCAL | SDJF_OTHER; - - journal_file_update_msg_ut(jf->filename, jf); - - internal_error(true, - "found journal file '%s', type %d, source '%s', " - "file modified: %"PRIu64", " - "msg {first: %"PRIu64", last: %"PRIu64"}", - jf->filename, jf->source_type, jf->source ? string2str(jf->source) : "", - jf->file_last_modified_ut, - jf->msg_first_ut, jf->msg_last_ut); -} - -static bool files_registry_conflict_cb(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *data __maybe_unused) { - struct journal_file *jf = old_value; - struct journal_file *njf = new_value; - - if(njf->last_scan_ut > jf->last_scan_ut) - jf->last_scan_ut = njf->last_scan_ut; - - if(njf->file_last_modified_ut > jf->file_last_modified_ut) { - jf->file_last_modified_ut = njf->file_last_modified_ut; - jf->size = njf->size; - - const char *filename = dictionary_acquired_item_name(item); - journal_file_update_msg_ut(filename, jf); - -// internal_error(true, -// "updated journal file '%s', type %d, " -// "file modified: %"PRIu64", " -// "msg {first: %"PRIu64", last: %"PRIu64"}", -// filename, jf->source_type, -// jf->file_last_modified_ut, -// jf->msg_first_ut, jf->msg_last_ut); - } - - return false; -} - -#define SDJF_SOURCE_ALL_NAME "all" -#define SDJF_SOURCE_LOCAL_NAME "all-local-logs" -#define SDJF_SOURCE_LOCAL_SYSTEM_NAME "all-local-system-logs" -#define SDJF_SOURCE_LOCAL_USERS_NAME "all-local-user-logs" -#define SDJF_SOURCE_LOCAL_OTHER_NAME "all-uncategorized" -#define SDJF_SOURCE_NAMESPACES_NAME "all-local-namespaces" -#define SDJF_SOURCE_REMOTES_NAME "all-remote-systems" - -struct journal_file_source { - usec_t first_ut; - usec_t last_ut; - size_t count; - uint64_t size; -}; - -static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) { - if(size > 1024ULL * 1024 * 1024 * 1024) - snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0); - else if(size > 1024ULL * 1024 * 1024) - snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0); - else if(size > 1024ULL * 1024) - snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0); - else if(size > 1024ULL) - snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0); - else - snprintfz(dst, dst_len, "%"PRIu64" B", size); -} - -#define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \ - if((remaining) > (duration)) { \ - uint64_t _count = (remaining) / (duration); \ - uint64_t _rem = (remaining) - (_count * (duration)); \ - (pos) += snprintfz(&(dst)[pos], (dst_len) - (pos), "%s%s%"PRIu64" %s", (printed) ? ", " : "", _rem ? "" : "and ", _count, _count > 1 ? (many) : (one)); \ - (remaining) = _rem; \ - (printed) = true; \ - } \ -} while(0) - -static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) { - if(duration_s < 0) - duration_s = -duration_s; - - size_t pos = 0; - dst[0] = 0 ; - - bool printed = false; - print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed); - print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed); - print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed); - print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed); - print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed); - print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed); -} - -static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) { - struct journal_file_source *jfs = entry; - BUFFER *wb = data; - - const char *name = dictionary_acquired_item_name(item); - - buffer_json_add_array_item_object(wb); - { - char size_for_humans[100]; - human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans)); - - char duration_for_humans[1024]; - human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC), - duration_for_humans, sizeof(duration_for_humans)); - - char info[1024]; - snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s", - jfs->count, size_for_humans, duration_for_humans); - - buffer_json_member_add_string(wb, "id", name); - buffer_json_member_add_string(wb, "name", name); - buffer_json_member_add_string(wb, "pill", size_for_humans); - buffer_json_member_add_string(wb, "info", info); - } - buffer_json_object_close(wb); // options object - - return 1; -} - -static bool journal_file_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) { - struct journal_file_source *jfs = old_value, *njfs = new_value; - jfs->count += njfs->count; - jfs->size += njfs->size; - - if(njfs->first_ut && njfs->first_ut < jfs->first_ut) - jfs->first_ut = njfs->first_ut; - - if(njfs->last_ut && njfs->last_ut > jfs->last_ut) - jfs->last_ut = njfs->last_ut; - - return false; -} - -static void available_journal_file_sources_to_json_array(BUFFER *wb) { - DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE); - dictionary_register_conflict_callback(dict, journal_file_merge_sizes, NULL); - - struct journal_file_source t = { 0 }; - - struct journal_file *jf; - dfe_start_read(journal_files_registry, jf) { - t.first_ut = jf->msg_first_ut; - t.last_ut = jf->msg_last_ut; - t.count = 1; - t.size = jf->size; - - dictionary_set(dict, SDJF_SOURCE_ALL_NAME, &t, sizeof(t)); - - if((jf->source_type & (SDJF_LOCAL)) == (SDJF_LOCAL)) - dictionary_set(dict, SDJF_SOURCE_LOCAL_NAME, &t, sizeof(t)); - if((jf->source_type & (SDJF_LOCAL | SDJF_SYSTEM)) == (SDJF_LOCAL | SDJF_SYSTEM)) - dictionary_set(dict, SDJF_SOURCE_LOCAL_SYSTEM_NAME, &t, sizeof(t)); - if((jf->source_type & (SDJF_LOCAL | SDJF_USER)) == (SDJF_LOCAL | SDJF_USER)) - dictionary_set(dict, SDJF_SOURCE_LOCAL_USERS_NAME, &t, sizeof(t)); - if((jf->source_type & (SDJF_LOCAL | SDJF_OTHER)) == (SDJF_LOCAL | SDJF_OTHER)) - dictionary_set(dict, SDJF_SOURCE_LOCAL_OTHER_NAME, &t, sizeof(t)); - if((jf->source_type & (SDJF_NAMESPACE)) == (SDJF_NAMESPACE)) - dictionary_set(dict, SDJF_SOURCE_NAMESPACES_NAME, &t, sizeof(t)); - if((jf->source_type & (SDJF_REMOTE)) == (SDJF_REMOTE)) - dictionary_set(dict, SDJF_SOURCE_REMOTES_NAME, &t, sizeof(t)); - if(jf->source) - dictionary_set(dict, string2str(jf->source), &t, sizeof(t)); - } - dfe_done(jf); - - dictionary_sorted_walkthrough_read(dict, journal_file_to_json_array_cb, wb); - - dictionary_destroy(dict); -} - -static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) { - struct journal_file *jf = value; (void)jf; - const char *filename = dictionary_acquired_item_name(item); (void)filename; - - string_freez(jf->source); - internal_error(true, "removed journal file '%s'", filename); -} - -void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_ut) { - static const char *ext = ".journal"; - static const size_t ext_len = sizeof(".journal") - 1; - - if (depth > VAR_LOG_JOURNAL_MAX_DEPTH) - return; - - DIR *dir; - struct dirent *entry; - struct stat info; - char absolute_path[FILENAME_MAX]; - - // Open the directory. - if ((dir = opendir(dirname)) == NULL) { - if(errno != ENOENT && errno != ENOTDIR) - netdata_log_error("Cannot opendir() '%s'", dirname); - return; - } - - // Read each entry in the directory. - while ((entry = readdir(dir)) != NULL) { - snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name); - if (stat(absolute_path, &info) != 0) { - netdata_log_error("Failed to stat() '%s", absolute_path); - continue; - } - - if (S_ISDIR(info.st_mode)) { - // If entry is a directory, call traverse recursively. - if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) - journal_directory_scan(absolute_path, depth + 1, last_scan_ut); - - } - else if (S_ISREG(info.st_mode)) { - // If entry is a regular file, check if it ends with .journal. - char *filename = entry->d_name; - size_t len = strlen(filename); - - if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) { - struct journal_file t = { - .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC, - .last_scan_ut = last_scan_ut, - .size = info.st_size, - .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT, - }; - dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t)); - } - } - } - - closedir(dir); -} - -static void journal_files_registry_update() { - usec_t scan_ut = now_monotonic_usec(); - - for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES ;i++) { - if(!journal_directories[i].path) - break; - - journal_directory_scan(journal_directories[i].path, 0, scan_ut); - } - - struct journal_file *jf; - dfe_start_write(journal_files_registry, jf) { - if(jf->last_scan_ut < scan_ut) - dictionary_del(journal_files_registry, jf_dfe.name); - } - dfe_done(jf); -} - -// ---------------------------------------------------------------------------- - static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { - if((fqs->source_type == SDJF_ALL || (jf->source_type & fqs->source_type) == fqs->source_type) && - (!fqs->source || fqs->source == jf->source)) { - - usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT; - usec_t first_ut = jf->msg_first_ut; - usec_t last_ut = jf->msg_last_ut + anchor_delta; - - if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut) - return true; - } - - return false; -} - -static int journal_file_dict_items_backward_compar(const void *a, const void *b) { - const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b; - struct journal_file *jfa = dictionary_acquired_item_value(*ad); - struct journal_file *jfb = dictionary_acquired_item_value(*bd); - - if(jfa->msg_last_ut < jfb->msg_last_ut) - return 1; - - if(jfa->msg_last_ut > jfb->msg_last_ut) - return -1; + if((fqs->source_type == SDJF_NONE && !fqs->sources) || (jf->source_type & fqs->source_type) || + (fqs->sources && simple_pattern_matches(fqs->sources, string2str(jf->source)))) { - if(jfa->msg_first_ut < jfb->msg_first_ut) - return 1; + if(!jf->msg_last_ut || !jf->msg_last_ut) + // the file is not scanned yet, or the timestamps have not been updated, + // so we don't know if it can contribute or not - let's add it. + return true; - if(jfa->msg_first_ut > jfb->msg_first_ut) - return -1; + usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT; + usec_t first_ut = jf->msg_first_ut - anchor_delta; + usec_t last_ut = jf->msg_last_ut + anchor_delta; - return 0; -} + if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut) + return true; + } -static int journal_file_dict_items_forward_compar(const void *a, const void *b) { - return -journal_file_dict_items_backward_compar(a, b); + return false; } static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) { @@ -1260,8 +1146,12 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU } bool partial = false; - usec_t started_ut; - usec_t ended_ut = now_monotonic_usec(); + usec_t query_started_ut = now_monotonic_usec(); + usec_t started_ut = query_started_ut; + usec_t ended_ut = started_ut; + usec_t duration_ut = 0, max_duration_ut = 0; + + sampling_query_init(fqs, facets); buffer_json_member_add_array(wb, "_journal_files"); for(size_t f = 0; f < files_used ;f++) { @@ -1271,8 +1161,19 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU if(!jf_is_mine(jf, fqs)) continue; + started_ut = ended_ut; + + // do not even try to do the query if we expect it to pass the timeout + if(ended_ut > (query_started_ut + (fqs->stop_monotonic_ut - query_started_ut) * 3 / 4) && + ended_ut + max_duration_ut * 2 >= fqs->stop_monotonic_ut) { + + partial = true; + status = ND_SD_JOURNAL_TIMED_OUT; + break; + } + fqs->file_working++; - fqs->cached_count = 0; + // fqs->cached_count = 0; size_t fs_calls = fstat_thread_calls; size_t fs_cached = fstat_thread_cached_responses; @@ -1281,8 +1182,22 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU size_t bytes_read = fqs->bytes_read; size_t matches_setup_ut = fqs->matches_setup_ut; + sampling_file_init(fqs, jf); + ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs); +// nd_log(NDLS_COLLECTORS, NDLP_INFO, +// "JOURNAL ESTIMATION FINAL: '%s' " +// "total lines %zu [sampled=%zu, unsampled=%zu, estimated=%zu], " +// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], " +// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], " +// , jf->filename +// , fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled + fqs->samples_per_file.estimated +// , fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated +// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file +// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut +// ); + rows_useful = fqs->rows_useful - rows_useful; rows_read = fqs->rows_read - rows_read; bytes_read = fqs->bytes_read - bytes_read; @@ -1290,9 +1205,11 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU fs_calls = fstat_thread_calls - fs_calls; fs_cached = fstat_thread_cached_responses - fs_cached; - started_ut = ended_ut; ended_ut = now_monotonic_usec(); - usec_t duration_ut = ended_ut - started_ut; + duration_ut = ended_ut - started_ut; + + if(duration_ut > max_duration_ut) + max_duration_ut = duration_ut; buffer_json_add_array_item_object(wb); // journal file { @@ -1315,6 +1232,16 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut); buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls); buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached); + + if(fqs->sampling) { + buffer_json_member_add_object(wb, "_sampling"); + { + buffer_json_member_add_uint64(wb, "sampled", fqs->samples_per_file.sampled); + buffer_json_member_add_uint64(wb, "unsampled", fqs->samples_per_file.unsampled); + buffer_json_member_add_uint64(wb, "estimated", fqs->samples_per_file.estimated); + } + buffer_json_object_close(wb); // _sampling + } } buffer_json_object_close(wb); // journal file @@ -1384,6 +1311,64 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU buffer_json_member_add_boolean(wb, "partial", partial); buffer_json_member_add_string(wb, "type", "table"); + // build a message for the query + if(!fqs->data_only) { + CLEAN_BUFFER *msg = buffer_create(0, NULL); + CLEAN_BUFFER *msg_description = buffer_create(0, NULL); + ND_LOG_FIELD_PRIORITY msg_priority = NDLP_INFO; + + if(!journal_files_completed_once()) { + buffer_strcat(msg, "Journals are still being scanned. "); + buffer_strcat(msg_description + , "LIBRARY SCAN: The journal files are still being scanned, you are probably viewing incomplete data. "); + msg_priority = NDLP_WARNING; + } + + if(partial) { + buffer_strcat(msg, "Query timed-out, incomplete data. "); + buffer_strcat(msg_description + , "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. "); + msg_priority = NDLP_WARNING; + } + + if(fqs->samples.estimated || fqs->samples.unsampled) { + double percent = (double) (fqs->samples.sampled * 100.0 / + (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled)); + buffer_sprintf(msg, "%.2f%% real data", percent); + buffer_sprintf(msg_description, "ACTUAL DATA: The filters counters reflect %0.2f%% of the data. ", percent); + msg_priority = MIN(msg_priority, NDLP_NOTICE); + } + + if(fqs->samples.unsampled) { + double percent = (double) (fqs->samples.unsampled * 100.0 / + (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled)); + buffer_sprintf(msg, ", %.2f%% unsampled", percent); + buffer_sprintf(msg_description + , "UNSAMPLED DATA: %0.2f%% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. " + , percent); + msg_priority = MIN(msg_priority, NDLP_NOTICE); + } + + if(fqs->samples.estimated) { + double percent = (double) (fqs->samples.estimated * 100.0 / + (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled)); + buffer_sprintf(msg, ", %.2f%% estimated", percent); + buffer_sprintf(msg_description + , "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by %0.2f%%. " + , percent); + msg_priority = MIN(msg_priority, NDLP_NOTICE); + } + + buffer_json_member_add_object(wb, "message"); + if(buffer_tostring(msg)) { + buffer_json_member_add_string(wb, "title", buffer_tostring(msg)); + buffer_json_member_add_string(wb, "description", buffer_tostring(msg_description)); + buffer_json_member_add_string(wb, "status", nd_log_id2priority(msg_priority)); + } + // else send an empty object if there is nothing to tell + buffer_json_object_close(wb); // message + } + if(!fqs->data_only) { buffer_json_member_add_time_t(wb, "update_every", 1); buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); @@ -1403,6 +1388,17 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses); } buffer_json_object_close(wb); // _fstat_caching + + if(fqs->sampling) { + buffer_json_member_add_object(wb, "_sampling"); + { + buffer_json_member_add_uint64(wb, "sampled", fqs->samples.sampled); + buffer_json_member_add_uint64(wb, "unsampled", fqs->samples.unsampled); + buffer_json_member_add_uint64(wb, "estimated", fqs->samples.estimated); + } + buffer_json_object_close(wb); // _sampling + } + buffer_json_finalize(wb); return HTTP_RESP_OK; @@ -1471,6 +1467,10 @@ static void netdata_systemd_journal_function_help(const char *transaction) { " The number of items to return.\n" " The default is %d.\n" "\n" + " "JOURNAL_PARAMETER_SAMPLING":ITEMS\n" + " The number of log entries to sample to estimate facets counters and histogram.\n" + " The default is %d.\n" + "\n" " "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n" " Return items relative to this timestamp.\n" " The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n" @@ -1511,6 +1511,7 @@ static void netdata_systemd_journal_function_help(const char *transaction) { , JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY + , SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING , JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward" ); @@ -1521,572 +1522,6 @@ static void netdata_systemd_journal_function_help(const char *transaction) { buffer_free(wb); } -const char *errno_map[] = { - [1] = "1 (EPERM)", // "Operation not permitted", - [2] = "2 (ENOENT)", // "No such file or directory", - [3] = "3 (ESRCH)", // "No such process", - [4] = "4 (EINTR)", // "Interrupted system call", - [5] = "5 (EIO)", // "Input/output error", - [6] = "6 (ENXIO)", // "No such device or address", - [7] = "7 (E2BIG)", // "Argument list too long", - [8] = "8 (ENOEXEC)", // "Exec format error", - [9] = "9 (EBADF)", // "Bad file descriptor", - [10] = "10 (ECHILD)", // "No child processes", - [11] = "11 (EAGAIN)", // "Resource temporarily unavailable", - [12] = "12 (ENOMEM)", // "Cannot allocate memory", - [13] = "13 (EACCES)", // "Permission denied", - [14] = "14 (EFAULT)", // "Bad address", - [15] = "15 (ENOTBLK)", // "Block device required", - [16] = "16 (EBUSY)", // "Device or resource busy", - [17] = "17 (EEXIST)", // "File exists", - [18] = "18 (EXDEV)", // "Invalid cross-device link", - [19] = "19 (ENODEV)", // "No such device", - [20] = "20 (ENOTDIR)", // "Not a directory", - [21] = "21 (EISDIR)", // "Is a directory", - [22] = "22 (EINVAL)", // "Invalid argument", - [23] = "23 (ENFILE)", // "Too many open files in system", - [24] = "24 (EMFILE)", // "Too many open files", - [25] = "25 (ENOTTY)", // "Inappropriate ioctl for device", - [26] = "26 (ETXTBSY)", // "Text file busy", - [27] = "27 (EFBIG)", // "File too large", - [28] = "28 (ENOSPC)", // "No space left on device", - [29] = "29 (ESPIPE)", // "Illegal seek", - [30] = "30 (EROFS)", // "Read-only file system", - [31] = "31 (EMLINK)", // "Too many links", - [32] = "32 (EPIPE)", // "Broken pipe", - [33] = "33 (EDOM)", // "Numerical argument out of domain", - [34] = "34 (ERANGE)", // "Numerical result out of range", - [35] = "35 (EDEADLK)", // "Resource deadlock avoided", - [36] = "36 (ENAMETOOLONG)", // "File name too long", - [37] = "37 (ENOLCK)", // "No locks available", - [38] = "38 (ENOSYS)", // "Function not implemented", - [39] = "39 (ENOTEMPTY)", // "Directory not empty", - [40] = "40 (ELOOP)", // "Too many levels of symbolic links", - [42] = "42 (ENOMSG)", // "No message of desired type", - [43] = "43 (EIDRM)", // "Identifier removed", - [44] = "44 (ECHRNG)", // "Channel number out of range", - [45] = "45 (EL2NSYNC)", // "Level 2 not synchronized", - [46] = "46 (EL3HLT)", // "Level 3 halted", - [47] = "47 (EL3RST)", // "Level 3 reset", - [48] = "48 (ELNRNG)", // "Link number out of range", - [49] = "49 (EUNATCH)", // "Protocol driver not attached", - [50] = "50 (ENOCSI)", // "No CSI structure available", - [51] = "51 (EL2HLT)", // "Level 2 halted", - [52] = "52 (EBADE)", // "Invalid exchange", - [53] = "53 (EBADR)", // "Invalid request descriptor", - [54] = "54 (EXFULL)", // "Exchange full", - [55] = "55 (ENOANO)", // "No anode", - [56] = "56 (EBADRQC)", // "Invalid request code", - [57] = "57 (EBADSLT)", // "Invalid slot", - [59] = "59 (EBFONT)", // "Bad font file format", - [60] = "60 (ENOSTR)", // "Device not a stream", - [61] = "61 (ENODATA)", // "No data available", - [62] = "62 (ETIME)", // "Timer expired", - [63] = "63 (ENOSR)", // "Out of streams resources", - [64] = "64 (ENONET)", // "Machine is not on the network", - [65] = "65 (ENOPKG)", // "Package not installed", - [66] = "66 (EREMOTE)", // "Object is remote", - [67] = "67 (ENOLINK)", // "Link has been severed", - [68] = "68 (EADV)", // "Advertise error", - [69] = "69 (ESRMNT)", // "Srmount error", - [70] = "70 (ECOMM)", // "Communication error on send", - [71] = "71 (EPROTO)", // "Protocol error", - [72] = "72 (EMULTIHOP)", // "Multihop attempted", - [73] = "73 (EDOTDOT)", // "RFS specific error", - [74] = "74 (EBADMSG)", // "Bad message", - [75] = "75 (EOVERFLOW)", // "Value too large for defined data type", - [76] = "76 (ENOTUNIQ)", // "Name not unique on network", - [77] = "77 (EBADFD)", // "File descriptor in bad state", - [78] = "78 (EREMCHG)", // "Remote address changed", - [79] = "79 (ELIBACC)", // "Can not access a needed shared library", - [80] = "80 (ELIBBAD)", // "Accessing a corrupted shared library", - [81] = "81 (ELIBSCN)", // ".lib section in a.out corrupted", - [82] = "82 (ELIBMAX)", // "Attempting to link in too many shared libraries", - [83] = "83 (ELIBEXEC)", // "Cannot exec a shared library directly", - [84] = "84 (EILSEQ)", // "Invalid or incomplete multibyte or wide character", - [85] = "85 (ERESTART)", // "Interrupted system call should be restarted", - [86] = "86 (ESTRPIPE)", // "Streams pipe error", - [87] = "87 (EUSERS)", // "Too many users", - [88] = "88 (ENOTSOCK)", // "Socket operation on non-socket", - [89] = "89 (EDESTADDRREQ)", // "Destination address required", - [90] = "90 (EMSGSIZE)", // "Message too long", - [91] = "91 (EPROTOTYPE)", // "Protocol wrong type for socket", - [92] = "92 (ENOPROTOOPT)", // "Protocol not available", - [93] = "93 (EPROTONOSUPPORT)", // "Protocol not supported", - [94] = "94 (ESOCKTNOSUPPORT)", // "Socket type not supported", - [95] = "95 (ENOTSUP)", // "Operation not supported", - [96] = "96 (EPFNOSUPPORT)", // "Protocol family not supported", - [97] = "97 (EAFNOSUPPORT)", // "Address family not supported by protocol", - [98] = "98 (EADDRINUSE)", // "Address already in use", - [99] = "99 (EADDRNOTAVAIL)", // "Cannot assign requested address", - [100] = "100 (ENETDOWN)", // "Network is down", - [101] = "101 (ENETUNREACH)", // "Network is unreachable", - [102] = "102 (ENETRESET)", // "Network dropped connection on reset", - [103] = "103 (ECONNABORTED)", // "Software caused connection abort", - [104] = "104 (ECONNRESET)", // "Connection reset by peer", - [105] = "105 (ENOBUFS)", // "No buffer space available", - [106] = "106 (EISCONN)", // "Transport endpoint is already connected", - [107] = "107 (ENOTCONN)", // "Transport endpoint is not connected", - [108] = "108 (ESHUTDOWN)", // "Cannot send after transport endpoint shutdown", - [109] = "109 (ETOOMANYREFS)", // "Too many references: cannot splice", - [110] = "110 (ETIMEDOUT)", // "Connection timed out", - [111] = "111 (ECONNREFUSED)", // "Connection refused", - [112] = "112 (EHOSTDOWN)", // "Host is down", - [113] = "113 (EHOSTUNREACH)", // "No route to host", - [114] = "114 (EALREADY)", // "Operation already in progress", - [115] = "115 (EINPROGRESS)", // "Operation now in progress", - [116] = "116 (ESTALE)", // "Stale file handle", - [117] = "117 (EUCLEAN)", // "Structure needs cleaning", - [118] = "118 (ENOTNAM)", // "Not a XENIX named type file", - [119] = "119 (ENAVAIL)", // "No XENIX semaphores available", - [120] = "120 (EISNAM)", // "Is a named type file", - [121] = "121 (EREMOTEIO)", // "Remote I/O error", - [122] = "122 (EDQUOT)", // "Disk quota exceeded", - [123] = "123 (ENOMEDIUM)", // "No medium found", - [124] = "124 (EMEDIUMTYPE)", // "Wrong medium type", - [125] = "125 (ECANCELED)", // "Operation canceled", - [126] = "126 (ENOKEY)", // "Required key not available", - [127] = "127 (EKEYEXPIRED)", // "Key has expired", - [128] = "128 (EKEYREVOKED)", // "Key has been revoked", - [129] = "129 (EKEYREJECTED)", // "Key was rejected by service", - [130] = "130 (EOWNERDEAD)", // "Owner died", - [131] = "131 (ENOTRECOVERABLE)", // "State not recoverable", - [132] = "132 (ERFKILL)", // "Operation not possible due to RF-kill", - [133] = "133 (EHWPOISON)", // "Memory page has hardware error", -}; - -static const char *syslog_facility_to_name(int facility) { - switch (facility) { - case LOG_FAC(LOG_KERN): return "kern"; - case LOG_FAC(LOG_USER): return "user"; - case LOG_FAC(LOG_MAIL): return "mail"; - case LOG_FAC(LOG_DAEMON): return "daemon"; - case LOG_FAC(LOG_AUTH): return "auth"; - case LOG_FAC(LOG_SYSLOG): return "syslog"; - case LOG_FAC(LOG_LPR): return "lpr"; - case LOG_FAC(LOG_NEWS): return "news"; - case LOG_FAC(LOG_UUCP): return "uucp"; - case LOG_FAC(LOG_CRON): return "cron"; - case LOG_FAC(LOG_AUTHPRIV): return "authpriv"; - case LOG_FAC(LOG_FTP): return "ftp"; - case LOG_FAC(LOG_LOCAL0): return "local0"; - case LOG_FAC(LOG_LOCAL1): return "local1"; - case LOG_FAC(LOG_LOCAL2): return "local2"; - case LOG_FAC(LOG_LOCAL3): return "local3"; - case LOG_FAC(LOG_LOCAL4): return "local4"; - case LOG_FAC(LOG_LOCAL5): return "local5"; - case LOG_FAC(LOG_LOCAL6): return "local6"; - case LOG_FAC(LOG_LOCAL7): return "local7"; - default: return NULL; - } -} - -static const char *syslog_priority_to_name(int priority) { - switch (priority) { - case LOG_ALERT: return "alert"; - case LOG_CRIT: return "critical"; - case LOG_DEBUG: return "debug"; - case LOG_EMERG: return "panic"; - case LOG_ERR: return "error"; - case LOG_INFO: return "info"; - case LOG_NOTICE: return "notice"; - case LOG_WARNING: return "warning"; - default: return NULL; - } -} - -static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets __maybe_unused, FACET_ROW *row, void *data __maybe_unused) { - // same to - // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501 - // function get_log_colors() - - FACET_ROW_KEY_VALUE *priority_rkv = dictionary_get(row->dict, "PRIORITY"); - if(!priority_rkv || priority_rkv->empty) - return FACET_ROW_SEVERITY_NORMAL; - - int priority = str2i(buffer_tostring(priority_rkv->wb)); - - if(priority <= LOG_ERR) - return FACET_ROW_SEVERITY_CRITICAL; - - else if (priority <= LOG_WARNING) - return FACET_ROW_SEVERITY_WARNING; - - else if(priority <= LOG_NOTICE) - return FACET_ROW_SEVERITY_NOTICE; - - else if(priority >= LOG_DEBUG) - return FACET_ROW_SEVERITY_DEBUG; - - return FACET_ROW_SEVERITY_NORMAL; -} - -static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { - static __thread char tmp[1024 + 1]; - struct passwd pw, *result = NULL; - - if (getpwuid_r(uid, &pw, tmp, sizeof(tmp), &result) != 0 || !result || !pw.pw_name || !(*pw.pw_name)) - snprintfz(buffer, buffer_size - 1, "%u", uid); - else - snprintfz(buffer, buffer_size - 1, "%u (%s)", uid, pw.pw_name); - - return buffer; -} - -static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) { - static __thread char tmp[1024]; - struct group grp, *result = NULL; - - if (getgrgid_r(gid, &grp, tmp, sizeof(tmp), &result) != 0 || !result || !grp.gr_name || !(*grp.gr_name)) - snprintfz(buffer, buffer_size - 1, "%u", gid); - else - snprintfz(buffer, buffer_size - 1, "%u (%s)", gid, grp.gr_name); - - return buffer; -} - -static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - int facility = str2i(buffer_tostring(wb)); - const char *name = syslog_facility_to_name(facility); - if (name) { - buffer_flush(wb); - buffer_strcat(wb, name); - } - } -} - -static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - if(scope == FACETS_TRANSFORM_FACET_SORT) - return; - - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - int priority = str2i(buffer_tostring(wb)); - const char *name = syslog_priority_to_name(priority); - if (name) { - buffer_flush(wb); - buffer_strcat(wb, name); - } - } -} - -static void netdata_systemd_journal_transform_errno(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - if(scope == FACETS_TRANSFORM_FACET_SORT) - return; - - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - unsigned err_no = str2u(buffer_tostring(wb)); - if(err_no > 0 && err_no < sizeof(errno_map) / sizeof(*errno_map)) { - const char *name = errno_map[err_no]; - if(name) { - buffer_flush(wb); - buffer_strcat(wb, name); - } - } - } -} - -// ---------------------------------------------------------------------------- -// UID and GID transformation - -#define UID_GID_HASHTABLE_SIZE 10000 - -struct word_t2str_hashtable_entry { - struct word_t2str_hashtable_entry *next; - Word_t hash; - size_t len; - char str[]; -}; - -struct word_t2str_hashtable { - SPINLOCK spinlock; - size_t size; - struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE]; -}; - -struct word_t2str_hashtable uid_hashtable = { - .size = UID_GID_HASHTABLE_SIZE, -}; - -struct word_t2str_hashtable gid_hashtable = { - .size = UID_GID_HASHTABLE_SIZE, -}; - -struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) { - size_t slot = hash % ht->size; - struct word_t2str_hashtable_entry **e = &ht->hashtable[slot]; - - while(*e && (*e)->hash != hash) - e = &((*e)->next); - - return e; -} - -const char *uid_to_username_cached(uid_t uid, size_t *length) { - spinlock_lock(&uid_hashtable.spinlock); - - struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid); - if(!(*e)) { - static __thread char buf[1024]; - const char *name = uid_to_username(uid, buf, sizeof(buf)); - size_t size = strlen(name) + 1; - - *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); - (*e)->len = size - 1; - (*e)->hash = uid; - memcpy((*e)->str, name, size); - } - - spinlock_unlock(&uid_hashtable.spinlock); - - *length = (*e)->len; - return (*e)->str; -} - -const char *gid_to_groupname_cached(gid_t gid, size_t *length) { - spinlock_lock(&gid_hashtable.spinlock); - - struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid); - if(!(*e)) { - static __thread char buf[1024]; - const char *name = gid_to_groupname(gid, buf, sizeof(buf)); - size_t size = strlen(name) + 1; - - *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); - (*e)->len = size - 1; - (*e)->hash = gid; - memcpy((*e)->str, name, size); - } - - spinlock_unlock(&gid_hashtable.spinlock); - - *length = (*e)->len; - return (*e)->str; -} - -DICTIONARY *boot_ids_to_first_ut = NULL; - -static void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - const char *boot_id = buffer_tostring(wb); - if(*boot_id && isxdigit(*boot_id)) { - usec_t ut = UINT64_MAX; - usec_t *p_ut = dictionary_get(boot_ids_to_first_ut, boot_id); - if(!p_ut) { - struct journal_file *jf; - dfe_start_read(journal_files_registry, jf) { - const char *files[2] = { - [0] = jf_dfe.name, - [1] = NULL, - }; - - sd_journal *j = NULL; - if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) - continue; - - char m[100]; - size_t len = snprintfz(m, sizeof(m), "_BOOT_ID=%s", boot_id); - usec_t t_ut = 0; - if(sd_journal_add_match(j, m, len) < 0 || - sd_journal_seek_head(j) < 0 || - sd_journal_next(j) < 0 || - sd_journal_get_realtime_usec(j, &t_ut) < 0 || !t_ut) { - sd_journal_close(j); - continue; - } - - if(t_ut < ut) - ut = t_ut; - - sd_journal_close(j); - } - dfe_done(jf); - - dictionary_set(boot_ids_to_first_ut, boot_id, &ut, sizeof(ut)); - } - else - ut = *p_ut; - - if(ut != UINT64_MAX) { - time_t timestamp_sec = (time_t)(ut / USEC_PER_SEC); - struct tm tm; - char buffer[30]; - - gmtime_r(×tamp_sec, &tm); - strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm); - - switch(scope) { - default: - case FACETS_TRANSFORM_DATA: - case FACETS_TRANSFORM_VALUE: - buffer_sprintf(wb, " (%s UTC) ", buffer); - break; - - case FACETS_TRANSFORM_FACET: - case FACETS_TRANSFORM_FACET_SORT: - case FACETS_TRANSFORM_HISTOGRAM: - buffer_flush(wb); - buffer_sprintf(wb, "%s UTC", buffer); - break; - } - } - } -} - -static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - if(scope == FACETS_TRANSFORM_FACET_SORT) - return; - - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - uid_t uid = str2i(buffer_tostring(wb)); - size_t len; - const char *name = uid_to_username_cached(uid, &len); - buffer_contents_replace(wb, name, len); - } -} - -static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - if(scope == FACETS_TRANSFORM_FACET_SORT) - return; - - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - gid_t gid = str2i(buffer_tostring(wb)); - size_t len; - const char *name = gid_to_groupname_cached(gid, &len); - buffer_contents_replace(wb, name, len); - } -} - -const char *linux_capabilities[] = { - [CAP_CHOWN] = "CHOWN", - [CAP_DAC_OVERRIDE] = "DAC_OVERRIDE", - [CAP_DAC_READ_SEARCH] = "DAC_READ_SEARCH", - [CAP_FOWNER] = "FOWNER", - [CAP_FSETID] = "FSETID", - [CAP_KILL] = "KILL", - [CAP_SETGID] = "SETGID", - [CAP_SETUID] = "SETUID", - [CAP_SETPCAP] = "SETPCAP", - [CAP_LINUX_IMMUTABLE] = "LINUX_IMMUTABLE", - [CAP_NET_BIND_SERVICE] = "NET_BIND_SERVICE", - [CAP_NET_BROADCAST] = "NET_BROADCAST", - [CAP_NET_ADMIN] = "NET_ADMIN", - [CAP_NET_RAW] = "NET_RAW", - [CAP_IPC_LOCK] = "IPC_LOCK", - [CAP_IPC_OWNER] = "IPC_OWNER", - [CAP_SYS_MODULE] = "SYS_MODULE", - [CAP_SYS_RAWIO] = "SYS_RAWIO", - [CAP_SYS_CHROOT] = "SYS_CHROOT", - [CAP_SYS_PTRACE] = "SYS_PTRACE", - [CAP_SYS_PACCT] = "SYS_PACCT", - [CAP_SYS_ADMIN] = "SYS_ADMIN", - [CAP_SYS_BOOT] = "SYS_BOOT", - [CAP_SYS_NICE] = "SYS_NICE", - [CAP_SYS_RESOURCE] = "SYS_RESOURCE", - [CAP_SYS_TIME] = "SYS_TIME", - [CAP_SYS_TTY_CONFIG] = "SYS_TTY_CONFIG", - [CAP_MKNOD] = "MKNOD", - [CAP_LEASE] = "LEASE", - [CAP_AUDIT_WRITE] = "AUDIT_WRITE", - [CAP_AUDIT_CONTROL] = "AUDIT_CONTROL", - [CAP_SETFCAP] = "SETFCAP", - [CAP_MAC_OVERRIDE] = "MAC_OVERRIDE", - [CAP_MAC_ADMIN] = "MAC_ADMIN", - [CAP_SYSLOG] = "SYSLOG", - [CAP_WAKE_ALARM] = "WAKE_ALARM", - [CAP_BLOCK_SUSPEND] = "BLOCK_SUSPEND", - [37 /*CAP_AUDIT_READ*/] = "AUDIT_READ", - [38 /*CAP_PERFMON*/] = "PERFMON", - [39 /*CAP_BPF*/] = "BPF", - [40 /* CAP_CHECKPOINT_RESTORE */] = "CHECKPOINT_RESTORE", -}; - -static void netdata_systemd_journal_transform_cap_effective(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - if(scope == FACETS_TRANSFORM_FACET_SORT) - return; - - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - uint64_t cap = strtoul(buffer_tostring(wb), NULL, 16); - if(cap) { - buffer_fast_strcat(wb, " (", 2); - for (size_t i = 0, added = 0; i < sizeof(linux_capabilities) / sizeof(linux_capabilities[0]); i++) { - if (linux_capabilities[i] && (cap & (1ULL << i))) { - - if (added) - buffer_fast_strcat(wb, " | ", 3); - - buffer_strcat(wb, linux_capabilities[i]); - added++; - } - } - buffer_fast_strcat(wb, ")", 1); - } - } -} - -static void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { - if(scope == FACETS_TRANSFORM_FACET_SORT) - return; - - const char *v = buffer_tostring(wb); - if(*v && isdigit(*v)) { - uint64_t ut = str2ull(buffer_tostring(wb), NULL); - if(ut) { - time_t timestamp_sec = ut / USEC_PER_SEC; - struct tm tm; - char buffer[30]; - - gmtime_r(×tamp_sec, &tm); - strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm); - buffer_sprintf(wb, " (%s.%06llu UTC)", buffer, ut % USEC_PER_SEC); - } - } -} - -// ---------------------------------------------------------------------------- - -static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { - FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID"); - const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET; - - const char *identifier = NULL; - FACET_ROW_KEY_VALUE *container_name_rkv = dictionary_get(row->dict, "CONTAINER_NAME"); - if(container_name_rkv && !container_name_rkv->empty) - identifier = buffer_tostring(container_name_rkv->wb); - - if(!identifier) { - FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER"); - if(syslog_identifier_rkv && !syslog_identifier_rkv->empty) - identifier = buffer_tostring(syslog_identifier_rkv->wb); - - if(!identifier) { - FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM"); - if(comm_rkv && !comm_rkv->empty) - identifier = buffer_tostring(comm_rkv->wb); - } - } - - buffer_flush(rkv->wb); - - if(!identifier) - buffer_strcat(rkv->wb, FACET_VALUE_UNSET); - else - buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid); - - buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb)); -} - -static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) { - buffer_json_add_array_item_object(json_array); - buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb)); - buffer_json_object_close(json_array); -} - DICTIONARY *function_query_status_dict = NULL; static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) { @@ -2129,7 +1564,7 @@ static void function_systemd_journal_progress(BUFFER *wb, const char *transactio buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut); buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched); char msg[1024 + 1]; - snprintfz(msg, 1024, + snprintfz(msg, sizeof(msg) - 1, "Read %zu rows (%0.0f rows/s), " "data %0.1f MB (%0.1f MB/s), " "file %zu of %zu", @@ -2147,10 +1582,9 @@ static void function_systemd_journal_progress(BUFFER *wb, const char *transactio dictionary_acquired_item_release(function_query_status_dict, item); } -static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { +void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { fstat_thread_calls = 0; fstat_thread_cached_responses = 0; - journal_files_registry_update(); BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); @@ -2186,6 +1620,7 @@ static void function_systemd_journal(const char *transaction, char *function, in facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS); facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA); facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL); + facets_accepted_param(facets, JOURNAL_PARAMETER_SAMPLING); #ifdef HAVE_SD_JOURNAL_RESTART_FIELDS facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE); @@ -2196,10 +1631,10 @@ static void function_systemd_journal(const char *transaction, char *function, in facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL); facets_register_key_name(facets, "_HOSTNAME", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE); facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS, - FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE, netdata_systemd_journal_dynamic_row_id, NULL); facets_register_key_name(facets, "MESSAGE", @@ -2212,71 +1647,78 @@ static void function_systemd_journal(const char *transaction, char *function, in // netdata_systemd_journal_rich_message, NULL); facets_register_key_name_transformation(facets, "PRIORITY", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW | + FACET_KEY_OPTION_EXPANDED_FILTER, netdata_systemd_journal_transform_priority, NULL); facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW | + FACET_KEY_OPTION_EXPANDED_FILTER, netdata_systemd_journal_transform_syslog_facility, NULL); facets_register_key_name_transformation(facets, "ERRNO", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_errno, NULL); facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE, FACET_KEY_OPTION_NEVER_FACET); facets_register_key_name(facets, "SYSLOG_IDENTIFIER", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + FACET_KEY_OPTION_FACET); facets_register_key_name(facets, "UNIT", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + FACET_KEY_OPTION_FACET); facets_register_key_name(facets, "USER_UNIT", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + FACET_KEY_OPTION_FACET); + + facets_register_key_name_transformation(facets, "MESSAGE_ID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW | + FACET_KEY_OPTION_EXPANDED_FILTER, + netdata_systemd_journal_transform_message_id, NULL); facets_register_key_name_transformation(facets, "_BOOT_ID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_boot_id, NULL); facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "_UID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "OBJECT_UID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "_GID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_gid, NULL); facets_register_key_name_transformation(facets, "OBJECT_GID", - FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_gid, NULL); facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE", - FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_cap_effective, NULL); facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID", - FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID", - FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_uid, NULL); facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP", - FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + FACET_KEY_OPTION_TRANSFORM_VIEW, netdata_systemd_journal_transform_timestamp_usec, NULL); // ------------------------------------------------------------------------ @@ -2290,10 +1732,11 @@ static void function_systemd_journal(const char *transaction, char *function, in FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION; const char *query = NULL; const char *chart = NULL; - const char *source = NULL; + SIMPLE_PATTERN *sources = NULL; const char *progress_id = NULL; SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL; size_t filters = 0; + size_t sampling = SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING; buffer_json_member_add_object(wb, "_request"); @@ -2329,6 +1772,9 @@ static void function_systemd_journal(const char *transaction, char *function, in else tail = true; } + else if(strncmp(keyword, JOURNAL_PARAMETER_SAMPLING ":", sizeof(JOURNAL_PARAMETER_SAMPLING ":") - 1) == 0) { + sampling = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_SAMPLING ":") - 1]); + } else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) { char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1]; @@ -2352,40 +1798,67 @@ static void function_systemd_journal(const char *transaction, char *function, in progress_id = id; } else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) { - source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; + const char *value = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; - if(strcmp(source, SDJF_SOURCE_ALL_NAME) == 0) { - source_type = SDJF_ALL; - source = NULL; - } - else if(strcmp(source, SDJF_SOURCE_LOCAL_NAME) == 0) { - source_type = SDJF_LOCAL; - source = NULL; - } - else if(strcmp(source, SDJF_SOURCE_REMOTES_NAME) == 0) { - source_type = SDJF_REMOTE; - source = NULL; - } - else if(strcmp(source, SDJF_SOURCE_NAMESPACES_NAME) == 0) { - source_type = SDJF_NAMESPACE; - source = NULL; - } - else if(strcmp(source, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) { - source_type = SDJF_LOCAL | SDJF_SYSTEM; - source = NULL; - } - else if(strcmp(source, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) { - source_type = SDJF_LOCAL | SDJF_USER; - source = NULL; - } - else if(strcmp(source, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) { - source_type = SDJF_LOCAL | SDJF_OTHER; - source = NULL; + buffer_json_member_add_array(wb, JOURNAL_PARAMETER_SOURCE); + + BUFFER *sources_list = buffer_create(0, NULL); + + source_type = SDJF_NONE; + while(value) { + char *sep = strchr(value, ','); + if(sep) + *sep++ = '\0'; + + buffer_json_add_array_item_string(wb, value); + + if(strcmp(value, SDJF_SOURCE_ALL_NAME) == 0) { + source_type |= SDJF_ALL; + value = NULL; + } + else if(strcmp(value, SDJF_SOURCE_LOCAL_NAME) == 0) { + source_type |= SDJF_LOCAL_ALL; + value = NULL; + } + else if(strcmp(value, SDJF_SOURCE_REMOTES_NAME) == 0) { + source_type |= SDJF_REMOTE_ALL; + value = NULL; + } + else if(strcmp(value, SDJF_SOURCE_NAMESPACES_NAME) == 0) { + source_type |= SDJF_LOCAL_NAMESPACE; + value = NULL; + } + else if(strcmp(value, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) { + source_type |= SDJF_LOCAL_SYSTEM; + value = NULL; + } + else if(strcmp(value, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) { + source_type |= SDJF_LOCAL_USER; + value = NULL; + } + else if(strcmp(value, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) { + source_type |= SDJF_LOCAL_OTHER; + value = NULL; + } + else { + // else, match the source, whatever it is + if(buffer_strlen(sources_list)) + buffer_strcat(sources_list, ","); + + buffer_strcat(sources_list, value); + } + + value = sep; } - else { - source_type = SDJF_ALL; - // else, match the source, whatever it is + + if(buffer_strlen(sources_list)) { + simple_pattern_free(sources); + sources = simple_pattern_create(buffer_tostring(sources_list), ",", SIMPLE_PATTERN_EXACT, false); } + + buffer_free(sources_list); + + buffer_json_array_close(wb); // source } else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) { after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]); @@ -2502,7 +1975,7 @@ static void function_systemd_journal(const char *transaction, char *function, in fqs->data_only = data_only; fqs->delta = (fqs->data_only) ? delta : false; fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false; - fqs->source = string_strdupz(source); + fqs->sources = sources; fqs->source_type = source_type; fqs->entries = last; fqs->last_modified = 0; @@ -2512,6 +1985,7 @@ static void function_systemd_journal(const char *transaction, char *function, in fqs->direction = direction; fqs->anchor.start_ut = anchor; fqs->anchor.stop_ut = 0; + fqs->sampling = sampling; if(fqs->anchor.start_ut && fqs->tail) { // a tail request @@ -2574,8 +2048,8 @@ static void function_systemd_journal(const char *transaction, char *function, in buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false); buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta); buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_SAMPLING, fqs->sampling); buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id); - buffer_json_member_add_string(wb, JOURNAL_PARAMETER_SOURCE, string2str(fqs->source)); buffer_json_member_add_uint64(wb, "source_type", fqs->source_type); buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC); buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC); @@ -2603,7 +2077,7 @@ static void function_systemd_journal(const char *transaction, char *function, in buffer_json_member_add_string(wb, "id", "source"); buffer_json_member_add_string(wb, "name", "source"); buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query"); - buffer_json_member_add_string(wb, "type", "select"); + buffer_json_member_add_string(wb, "type", "multiselect"); buffer_json_member_add_array(wb, "options"); { available_journal_file_sources_to_json_array(wb); @@ -2631,12 +2105,6 @@ static void function_systemd_journal(const char *transaction, char *function, in response = netdata_systemd_journal_query(wb, facets, fqs); - // ------------------------------------------------------------------------ - // cleanup query params - - string_freez(fqs->source); - fqs->source = NULL; - // ------------------------------------------------------------------------ // handle error response @@ -2653,6 +2121,7 @@ output: netdata_mutex_unlock(&stdout_mutex); cleanup: + simple_pattern_free(sources); facets_destroy(facets); buffer_free(wb); @@ -2663,129 +2132,8 @@ cleanup: } } -// ---------------------------------------------------------------------------- - -int main(int argc __maybe_unused, char **argv __maybe_unused) { - stderror = stderr; - clocks_init(); - - program_name = "systemd-journal.plugin"; - - // disable syslog - error_log_syslog = 0; - - // set errors flood protection to 100 logs per hour - error_log_errors_per_period = 100; - error_log_throttle_period = 3600; - - log_set_global_severity_for_external_plugins(); - - netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); - if(verify_netdata_host_prefix() == -1) exit(1); - - // ------------------------------------------------------------------------ - // setup the journal directories - - unsigned d = 0; - - journal_directories[d++].path = strdupz("/var/log/journal"); - journal_directories[d++].path = strdupz("/run/log/journal"); - - if(*netdata_configured_host_prefix) { - char path[PATH_MAX]; - snprintfz(path, sizeof(path), "%s/var/log/journal", netdata_configured_host_prefix); - journal_directories[d++].path = strdupz(path); - snprintfz(path, sizeof(path), "%s/run/log/journal", netdata_configured_host_prefix); - journal_directories[d++].path = strdupz(path); - } - - // terminate the list - journal_directories[d].path = NULL; - - // ------------------------------------------------------------------------ - +void journal_init_query_status(void) { function_query_status_dict = dictionary_create_advanced( DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(FUNCTION_QUERY_STATUS)); - - // ------------------------------------------------------------------------ - // initialize the used hashes files registry - - used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); - - - // ------------------------------------------------------------------------ - // initialize the journal files registry - - systemd_journal_session = (now_realtime_usec() / USEC_PER_SEC) * USEC_PER_SEC; - - journal_files_registry = dictionary_create_advanced( - DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, - NULL, sizeof(struct journal_file)); - - dictionary_register_insert_callback(journal_files_registry, files_registry_insert_cb, NULL); - dictionary_register_delete_callback(journal_files_registry, files_registry_delete_cb, NULL); - dictionary_register_conflict_callback(journal_files_registry, files_registry_conflict_cb, NULL); - - boot_ids_to_first_ut = dictionary_create_advanced( - DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, - NULL, sizeof(usec_t)); - - journal_files_registry_update(); - - - // ------------------------------------------------------------------------ - // debug - - if(argc == 2 && strcmp(argv[1], "debug") == 0) { - bool cancelled = false; - char buf[] = "systemd-journal after:-16000000 before:0 last:1"; - // char buf[] = "systemd-journal after:1695332964 before:1695937764 direction:backward last:100 slice:true source:all DHKucpqUoe1:PtVoyIuX.MU"; - // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403"; - function_systemd_journal("123", buf, 600, &cancelled); - exit(1); - } - - // ------------------------------------------------------------------------ - // the event loop for functions - - struct functions_evloop_globals *wg = - functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit); - - functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal, - SYSTEMD_JOURNAL_DEFAULT_TIMEOUT); - - - // ------------------------------------------------------------------------ - - time_t started_t = now_monotonic_sec(); - - size_t iteration = 0; - usec_t step = 1000 * USEC_PER_MS; - bool tty = isatty(fileno(stderr)) == 1; - - netdata_mutex_lock(&stdout_mutex); - fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n", - SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); - - heartbeat_t hb; - heartbeat_init(&hb); - while(!plugin_should_exit) { - iteration++; - - netdata_mutex_unlock(&stdout_mutex); - heartbeat_next(&hb, step); - netdata_mutex_lock(&stdout_mutex); - - if(!tty) - fprintf(stdout, "\n"); - - fflush(stdout); - - time_t now = now_monotonic_sec(); - if(now - started_t > 86400) - break; - } - - exit(0); } -- cgit v1.2.3