diff options
Diffstat (limited to 'src/collectors/systemd-journal.plugin/systemd-journal-sampling.h')
-rw-r--r-- | src/collectors/systemd-journal.plugin/systemd-journal-sampling.h | 378 |
1 files changed, 378 insertions, 0 deletions
diff --git a/src/collectors/systemd-journal.plugin/systemd-journal-sampling.h b/src/collectors/systemd-journal.plugin/systemd-journal-sampling.h new file mode 100644 index 000000000..0e1fed2d6 --- /dev/null +++ b/src/collectors/systemd-journal.plugin/systemd-journal-sampling.h @@ -0,0 +1,378 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_SYSTEMD_JOURNAL_SAMPLING_H +#define NETDATA_SYSTEMD_JOURNAL_SAMPLING_H + +// ---------------------------------------------------------------------------- +// sampling support + +static inline void sampling_query_init(LOGS_QUERY_STATUS *lqs, FACETS *facets) { + if(!lqs->rq.sampling) + return; + + if(!lqs->rq.slice) { + // the user is doing a full data query + // disable sampling + lqs->rq.sampling = 0; + return; + } + + if(lqs->rq.data_only) { + // the user is doing a data query + // disable sampling + lqs->rq.sampling = 0; + return; + } + + if(!lqs->c.files_matched) { + // no files have been matched + // disable sampling + lqs->rq.sampling = 0; + return; + } + + lqs->c.samples.slots = facets_histogram_slots(facets); + if(lqs->c.samples.slots < 2) + lqs->c.samples.slots = 2; + if(lqs->c.samples.slots > SYSTEMD_JOURNAL_SAMPLING_SLOTS) + lqs->c.samples.slots = SYSTEMD_JOURNAL_SAMPLING_SLOTS; + + if(!lqs->rq.after_ut || !lqs->rq.before_ut || lqs->rq.after_ut >= lqs->rq.before_ut) { + // we don't have enough information for sampling + lqs->rq.sampling = 0; + return; + } + + usec_t delta = lqs->rq.before_ut - lqs->rq.after_ut; + usec_t step = delta / facets_histogram_slots(facets) - 1; + if(step < 1) step = 1; + + lqs->c.samples_per_time_slot.start_ut = lqs->rq.after_ut; + lqs->c.samples_per_time_slot.end_ut = lqs->rq.before_ut; + lqs->c.samples_per_time_slot.step_ut = step; + + // the minimum number of rows to enable sampling + lqs->c.samples.enable_after_samples = lqs->rq.sampling / 2; + + size_t files_matched = lqs->c.files_matched; + if(!files_matched) + files_matched = 1; + + // the minimum number of rows per file to enable sampling + lqs->c.samples_per_file.enable_after_samples = (lqs->rq.sampling / 4) / files_matched; + if(lqs->c.samples_per_file.enable_after_samples < lqs->rq.entries) + lqs->c.samples_per_file.enable_after_samples = lqs->rq.entries; + + // the minimum number of rows per time slot to enable sampling + lqs->c.samples_per_time_slot.enable_after_samples = (lqs->rq.sampling / 4) / lqs->c.samples.slots; + if(lqs->c.samples_per_time_slot.enable_after_samples < lqs->rq.entries) + lqs->c.samples_per_time_slot.enable_after_samples = lqs->rq.entries; +} + +static inline void sampling_file_init(LOGS_QUERY_STATUS *lqs, struct journal_file *jf __maybe_unused) { + lqs->c.samples_per_file.sampled = 0; + lqs->c.samples_per_file.unsampled = 0; + lqs->c.samples_per_file.estimated = 0; + lqs->c.samples_per_file.every = 0; + lqs->c.samples_per_file.skipped = 0; + lqs->c.samples_per_file.recalibrate = 0; +} + +static inline size_t sampling_file_lines_scanned_so_far(LOGS_QUERY_STATUS *lqs) { + size_t sampled = lqs->c.samples_per_file.sampled + lqs->c.samples_per_file.unsampled; + if(!sampled) sampled = 1; + return sampled; +} + +static inline void sampling_running_file_query_overlapping_timeframe_ut( + LOGS_QUERY_STATUS *lqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, + usec_t msg_ut, usec_t *after_ut, usec_t *before_ut) { + + // find the overlap of the query and file timeframes + // taking into account the first message we encountered + + usec_t oldest_ut, newest_ut; + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) { + // the first message we know (oldest) + oldest_ut = lqs->c.query_file.first_msg_ut ? lqs->c.query_file.first_msg_ut : jf->msg_first_ut; + if(!oldest_ut) oldest_ut = lqs->c.query_file.start_ut; + + if(jf->msg_last_ut) + newest_ut = MIN(lqs->c.query_file.stop_ut, jf->msg_last_ut); + else if(jf->file_last_modified_ut) + newest_ut = MIN(lqs->c.query_file.stop_ut, jf->file_last_modified_ut); + else + newest_ut = lqs->c.query_file.stop_ut; + + if(msg_ut < oldest_ut) + oldest_ut = msg_ut - 1; + } + else /* BACKWARD */ { + // the latest message we know (newest) + newest_ut = lqs->c.query_file.first_msg_ut ? lqs->c.query_file.first_msg_ut : jf->msg_last_ut; + if(!newest_ut) newest_ut = lqs->c.query_file.start_ut; + + if(jf->msg_first_ut) + oldest_ut = MAX(lqs->c.query_file.stop_ut, jf->msg_first_ut); + else + oldest_ut = lqs->c.query_file.stop_ut; + + if(newest_ut < msg_ut) + newest_ut = msg_ut + 1; + } + + *after_ut = oldest_ut; + *before_ut = newest_ut; +} + +static inline double sampling_running_file_query_progress_by_time( + LOGS_QUERY_STATUS *lqs, struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { + + usec_t after_ut, before_ut, elapsed_ut; + sampling_running_file_query_overlapping_timeframe_ut(lqs, jf, direction, msg_ut, &after_ut, &before_ut); + + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) + elapsed_ut = msg_ut - after_ut; + else + elapsed_ut = before_ut - msg_ut; + + usec_t total_ut = before_ut - after_ut; + double progress = (double)elapsed_ut / (double)total_ut; + + return progress; +} + +static inline usec_t sampling_running_file_query_remaining_time( + LOGS_QUERY_STATUS *lqs, struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut, + usec_t *total_time_ut, usec_t *remaining_start_ut, + usec_t *remaining_end_ut) { + usec_t after_ut, before_ut; + sampling_running_file_query_overlapping_timeframe_ut(lqs, jf, direction, msg_ut, &after_ut, &before_ut); + + // since we have a timestamp in msg_ut + // this timestamp can extend the overlap + if(msg_ut <= after_ut) + after_ut = msg_ut - 1; + + if(msg_ut >= before_ut) + before_ut = msg_ut + 1; + + // return the remaining duration + usec_t remaining_from_ut, remaining_to_ut; + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) { + remaining_from_ut = msg_ut; + remaining_to_ut = before_ut; + } + else { + remaining_from_ut = after_ut; + remaining_to_ut = msg_ut; + } + + usec_t remaining_ut = remaining_to_ut - remaining_from_ut; + + if(total_time_ut) + *total_time_ut = (before_ut > after_ut) ? before_ut - after_ut : 1; + + if(remaining_start_ut) + *remaining_start_ut = remaining_from_ut; + + if(remaining_end_ut) + *remaining_end_ut = remaining_to_ut; + + return remaining_ut; +} + +static inline size_t sampling_running_file_query_estimate_remaining_lines_by_time( + LOGS_QUERY_STATUS *lqs, + struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, + usec_t msg_ut) { + size_t scanned_lines = sampling_file_lines_scanned_so_far(lqs); + + // Calculate the proportion of time covered + usec_t total_time_ut, remaining_start_ut, remaining_end_ut; + usec_t remaining_time_ut = sampling_running_file_query_remaining_time( + lqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut, &remaining_end_ut); + if (total_time_ut == 0) total_time_ut = 1; + + double proportion_by_time = (double) (total_time_ut - remaining_time_ut) / (double) total_time_ut; + + if (proportion_by_time == 0 || proportion_by_time > 1.0 || !isfinite(proportion_by_time)) + proportion_by_time = 1.0; + + // Estimate the total number of lines in the file + size_t expected_matching_logs_by_time = (size_t)((double)scanned_lines / proportion_by_time); + + if(jf->messages_in_file && expected_matching_logs_by_time > jf->messages_in_file) + expected_matching_logs_by_time = jf->messages_in_file; + + // Calculate the estimated number of remaining lines + size_t remaining_logs_by_time = expected_matching_logs_by_time - scanned_lines; + if (remaining_logs_by_time < 1) remaining_logs_by_time = 1; + + // nd_log(NDLS_COLLECTORS, NDLP_INFO, + // "JOURNAL ESTIMATION: '%s' " + // "scanned_lines=%zu [sampled=%zu, unsampled=%zu, estimated=%zu], " + // "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], " + // "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], " + // "first message read from the file at %"PRIu64", current message at %"PRIu64", " + // "proportion of time %.2f %%, " + // "expected total lines in file %zu, " + // "remaining lines %zu, " + // "remaining time %"PRIu64" [%"PRIu64" - %"PRIu64", duration %"PRId64"]" + // , jf->filename + // , scanned_lines, fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated + // , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file + // , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut + // , fqs->query_file.first_msg_ut, msg_ut + // , proportion_by_time * 100.0 + // , expected_matching_logs_by_time + // , remaining_logs_by_time + // , remaining_time_ut, remaining_start_ut, remaining_end_ut, remaining_end_ut - remaining_start_ut + // ); + + return remaining_logs_by_time; +} + +static inline size_t sampling_running_file_query_estimate_remaining_lines( + sd_journal *j __maybe_unused, LOGS_QUERY_STATUS *lqs, struct journal_file *jf, + FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { + size_t remaining_logs_by_seqnum = 0; + +#ifdef HAVE_SD_JOURNAL_GET_SEQNUM + size_t expected_matching_logs_by_seqnum = 0; + double proportion_by_seqnum = 0.0; + uint64_t current_msg_seqnum; + sd_id128_t current_msg_writer; + if(!lqs->c.query_file.first_msg_seqnum || sd_journal_get_seqnum(j, ¤t_msg_seqnum, ¤t_msg_writer) < 0) { + lqs->c.query_file.first_msg_seqnum = 0; + lqs->c.query_file.first_msg_writer = SD_ID128_NULL; + } + else if(jf->messages_in_file) { + size_t scanned_lines = sampling_file_lines_scanned_so_far(lqs); + + double proportion_of_all_lines_so_far; + if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) + proportion_of_all_lines_so_far = (double)scanned_lines / (double)(current_msg_seqnum - jf->first_seqnum); + else + proportion_of_all_lines_so_far = (double)scanned_lines / (double)(jf->last_seqnum - current_msg_seqnum); + + if(proportion_of_all_lines_so_far > 1.0) + proportion_of_all_lines_so_far = 1.0; + + expected_matching_logs_by_seqnum = (size_t)(proportion_of_all_lines_so_far * (double)jf->messages_in_file); + + proportion_by_seqnum = (double)scanned_lines / (double)expected_matching_logs_by_seqnum; + + if (proportion_by_seqnum == 0 || proportion_by_seqnum > 1.0 || !isfinite(proportion_by_seqnum)) + proportion_by_seqnum = 1.0; + + remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines; + if(!remaining_logs_by_seqnum) remaining_logs_by_seqnum = 1; + } +#endif + + if(remaining_logs_by_seqnum) + return remaining_logs_by_seqnum; + + return sampling_running_file_query_estimate_remaining_lines_by_time(lqs, jf, direction, msg_ut); +} + +static inline void sampling_decide_file_sampling_every(sd_journal *j, + LOGS_QUERY_STATUS *lqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { + size_t files_matched = lqs->c.files_matched; + if(!files_matched) files_matched = 1; + + size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, lqs, jf, direction, msg_ut); + size_t wanted_samples = (lqs->rq.sampling / 2) / files_matched; + if(!wanted_samples) wanted_samples = 1; + + lqs->c.samples_per_file.every = remaining_lines / wanted_samples; + + if(lqs->c.samples_per_file.every < 1) + lqs->c.samples_per_file.every = 1; +} + +typedef enum { + SAMPLING_STOP_AND_ESTIMATE = -1, + SAMPLING_FULL = 0, + SAMPLING_SKIP_FIELDS = 1, +} sampling_t; + +static inline sampling_t is_row_in_sample( + sd_journal *j, LOGS_QUERY_STATUS *lqs, struct journal_file *jf, + usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction, bool candidate_to_keep) { + if(!lqs->rq.sampling || candidate_to_keep) + return SAMPLING_FULL; + + if(unlikely(msg_ut < lqs->c.samples_per_time_slot.start_ut)) + msg_ut = lqs->c.samples_per_time_slot.start_ut; + if(unlikely(msg_ut > lqs->c.samples_per_time_slot.end_ut)) + msg_ut = lqs->c.samples_per_time_slot.end_ut; + + size_t slot = (msg_ut - lqs->c.samples_per_time_slot.start_ut) / lqs->c.samples_per_time_slot.step_ut; + if(slot >= lqs->c.samples.slots) + slot = lqs->c.samples.slots - 1; + + bool should_sample = false; + + if(lqs->c.samples.sampled < lqs->c.samples.enable_after_samples || + lqs->c.samples_per_file.sampled < lqs->c.samples_per_file.enable_after_samples || + lqs->c.samples_per_time_slot.sampled[slot] < lqs->c.samples_per_time_slot.enable_after_samples) + should_sample = true; + + else if(lqs->c.samples_per_file.recalibrate >= SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE || !lqs->c.samples_per_file.every) { + // this is the first to be unsampled for this file + sampling_decide_file_sampling_every(j, lqs, jf, direction, msg_ut); + lqs->c.samples_per_file.recalibrate = 0; + should_sample = true; + } + else { + // we sample 1 every fqs->samples_per_file.every + if(lqs->c.samples_per_file.skipped >= lqs->c.samples_per_file.every) { + lqs->c.samples_per_file.skipped = 0; + should_sample = true; + } + else + lqs->c.samples_per_file.skipped++; + } + + if(should_sample) { + lqs->c.samples.sampled++; + lqs->c.samples_per_file.sampled++; + lqs->c.samples_per_time_slot.sampled[slot]++; + + return SAMPLING_FULL; + } + + lqs->c.samples_per_file.recalibrate++; + + lqs->c.samples.unsampled++; + lqs->c.samples_per_file.unsampled++; + lqs->c.samples_per_time_slot.unsampled[slot]++; + + if(lqs->c.samples_per_file.unsampled > lqs->c.samples_per_file.sampled) { + double progress_by_time = sampling_running_file_query_progress_by_time(lqs, jf, direction, msg_ut); + + if(progress_by_time > SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE) + return SAMPLING_STOP_AND_ESTIMATE; + } + + return SAMPLING_SKIP_FIELDS; +} + +static inline void sampling_update_running_query_file_estimates( + FACETS *facets, sd_journal *j, + LOGS_QUERY_STATUS *lqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction) { + usec_t total_time_ut, remaining_start_ut, remaining_end_ut; + sampling_running_file_query_remaining_time( + lqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut, &remaining_end_ut); + size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, lqs, jf, direction, msg_ut); + facets_update_estimations(facets, remaining_start_ut, remaining_end_ut, remaining_lines); + lqs->c.samples.estimated += remaining_lines; + lqs->c.samples_per_file.estimated += remaining_lines; +} + +#endif //NETDATA_SYSTEMD_JOURNAL_SAMPLING_H |