summaryrefslogtreecommitdiffstats
path: root/collectors/systemd-journal.plugin/systemd-journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/systemd-journal.plugin/systemd-journal.c')
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c2158
1 files changed, 753 insertions, 1405 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
index 877371120..f812b2161 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -5,13 +5,7 @@
* GPL v3+
*/
-#include "collectors/all.h"
-#include "libnetdata/libnetdata.h"
-#include "libnetdata/required_dummies.h"
-
-#include <linux/capability.h>
-#include <systemd/sd-journal.h>
-#include <syslog.h>
+#include "systemd-internals.h"
/*
* TODO
@@ -20,95 +14,17 @@
*
*/
-
-// ----------------------------------------------------------------------------
-// fstat64 overloading to speed up libsystemd
-// https://github.com/systemd/systemd/pull/29261
-
-#define ND_SD_JOURNAL_OPEN_FLAGS (0)
-
-#include <dlfcn.h>
-#include <sys/stat.h>
-
-#define FSTAT_CACHE_MAX 1024
-struct fdstat64_cache_entry {
- bool enabled;
- bool updated;
- int err_no;
- struct stat64 stat;
- int ret;
- size_t cached_count;
- size_t session;
-};
-
-struct fdstat64_cache_entry fstat64_cache[FSTAT_CACHE_MAX] = {0 };
-static __thread size_t fstat_thread_calls = 0;
-static __thread size_t fstat_thread_cached_responses = 0;
-static __thread bool enable_thread_fstat = false;
-static __thread size_t fstat_caching_thread_session = 0;
-static size_t fstat_caching_global_session = 0;
-
-static void fstat_cache_enable_on_thread(void) {
- fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_ACQUIRE);
- enable_thread_fstat = true;
-}
-
-static void fstat_cache_disable_on_thread(void) {
- fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_RELEASE);
- enable_thread_fstat = false;
-}
-
-int fstat64(int fd, struct stat64 *buf) {
- static int (*real_fstat)(int, struct stat64 *) = NULL;
- if (!real_fstat)
- real_fstat = dlsym(RTLD_NEXT, "fstat64");
-
- fstat_thread_calls++;
-
- if(fd >= 0 && fd < FSTAT_CACHE_MAX) {
- if(enable_thread_fstat && fstat64_cache[fd].session != fstat_caching_thread_session) {
- fstat64_cache[fd].session = fstat_caching_thread_session;
- fstat64_cache[fd].enabled = true;
- fstat64_cache[fd].updated = false;
- }
-
- if(fstat64_cache[fd].enabled && fstat64_cache[fd].updated && fstat64_cache[fd].session == fstat_caching_thread_session) {
- fstat_thread_cached_responses++;
- errno = fstat64_cache[fd].err_no;
- *buf = fstat64_cache[fd].stat;
- fstat64_cache[fd].cached_count++;
- return fstat64_cache[fd].ret;
- }
- }
-
- int ret = real_fstat(fd, buf);
-
- if(fd >= 0 && fd < FSTAT_CACHE_MAX && fstat64_cache[fd].enabled) {
- fstat64_cache[fd].ret = ret;
- fstat64_cache[fd].updated = true;
- fstat64_cache[fd].err_no = errno;
- fstat64_cache[fd].stat = *buf;
- fstat64_cache[fd].session = fstat_caching_thread_session;
- }
-
- return ret;
-}
-
-// ----------------------------------------------------------------------------
-
#define FACET_MAX_VALUE_LENGTH 8192
-#define SYSTEMD_JOURNAL_MAX_SOURCE_LEN 64
#define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries."
#define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal"
#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60
-#define SYSTEMD_JOURNAL_MAX_PARAMS 100
+#define SYSTEMD_JOURNAL_MAX_PARAMS 1000
#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600)
#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
-#define SYSTEMD_JOURNAL_WORKER_THREADS 5
-
-#define JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT (5 * USEC_PER_SEC) // assume always 5 seconds latency
-#define JOURNAL_VS_REALTIME_DELTA_MAX_UT (2 * 60 * USEC_PER_SEC) // up to 2 minutes latency
+#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING 1000000
+#define SYSTEMD_JOURNAL_SAMPLING_SLOTS 1000
+#define SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE 10000
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
@@ -128,6 +44,7 @@ int fstat64(int fd, struct stat64 *buf) {
#define JOURNAL_PARAMETER_SLICE "slice"
#define JOURNAL_PARAMETER_DELTA "delta"
#define JOURNAL_PARAMETER_TAIL "tail"
+#define JOURNAL_PARAMETER_SAMPLING "sampling"
#define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE"
#define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS"
@@ -138,7 +55,8 @@ int fstat64(int fd, struct stat64 *buf) {
#define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL
#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \
- "*MESSAGE*" \
+ "!MESSAGE_ID" \
+ "|*MESSAGE*" \
"|*_RAW" \
"|*_USEC" \
"|*_NSEC" \
@@ -153,7 +71,7 @@ int fstat64(int fd, struct stat64 *buf) {
/* --- USER JOURNAL FIELDS --- */ \
\
/* "|MESSAGE" */ \
- /* "|MESSAGE_ID" */ \
+ "|MESSAGE_ID" \
"|PRIORITY" \
"|CODE_FILE" \
/* "|CODE_LINE" */ \
@@ -247,33 +165,22 @@ int fstat64(int fd, struct stat64 *buf) {
"|IMAGE_NAME" /* undocumented */ \
/* "|CONTAINER_PARTIAL_MESSAGE" */ \
\
+ \
+ /* --- NETDATA --- */ \
+ \
+ "|ND_NIDL_NODE" \
+ "|ND_NIDL_CONTEXT" \
+ "|ND_LOG_SOURCE" \
+ /*"|ND_MODULE" */ \
+ "|ND_ALERT_NAME" \
+ "|ND_ALERT_CLASS" \
+ "|ND_ALERT_COMPONENT" \
+ "|ND_ALERT_TYPE" \
+ \
""
-static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
-static bool plugin_should_exit = false;
-
// ----------------------------------------------------------------------------
-typedef enum {
- ND_SD_JOURNAL_NO_FILE_MATCHED,
- ND_SD_JOURNAL_FAILED_TO_OPEN,
- ND_SD_JOURNAL_FAILED_TO_SEEK,
- ND_SD_JOURNAL_TIMED_OUT,
- ND_SD_JOURNAL_OK,
- ND_SD_JOURNAL_NOT_MODIFIED,
- ND_SD_JOURNAL_CANCELLED,
-} ND_SD_JOURNAL_STATUS;
-
-typedef enum {
- SDJF_ALL = 0,
- SDJF_LOCAL = (1 << 0),
- SDJF_REMOTE = (1 << 1),
- SDJF_SYSTEM = (1 << 2),
- SDJF_USER = (1 << 3),
- SDJF_NAMESPACE = (1 << 4),
- SDJF_OTHER = (1 << 5),
-} SD_JOURNAL_FILE_SOURCE_TYPE;
-
typedef struct function_query_status {
bool *cancelled; // a pointer to the cancelling boolean
usec_t stop_monotonic_ut;
@@ -282,7 +189,7 @@ typedef struct function_query_status {
// request
SD_JOURNAL_FILE_SOURCE_TYPE source_type;
- STRING *source;
+ SIMPLE_PATTERN *sources;
usec_t after_ut;
usec_t before_ut;
@@ -298,13 +205,50 @@ typedef struct function_query_status {
bool tail;
bool data_only;
bool slice;
+ size_t sampling;
size_t filters;
usec_t last_modified;
const char *query;
const char *histogram;
+ struct {
+ usec_t start_ut; // the starting time of the query - we start from this
+ usec_t stop_ut; // the ending time of the query - we stop at this
+ usec_t first_msg_ut;
+
+ sd_id128_t first_msg_writer;
+ uint64_t first_msg_seqnum;
+ } query_file;
+
+ struct {
+ uint32_t enable_after_samples;
+ uint32_t slots;
+ uint32_t sampled;
+ uint32_t unsampled;
+ uint32_t estimated;
+ } samples;
+
+ struct {
+ uint32_t enable_after_samples;
+ uint32_t every;
+ uint32_t skipped;
+ uint32_t recalibrate;
+ uint32_t sampled;
+ uint32_t unsampled;
+ uint32_t estimated;
+ } samples_per_file;
+
+ struct {
+ usec_t start_ut;
+ usec_t end_ut;
+ usec_t step_ut;
+ uint32_t enable_after_samples;
+ uint32_t sampled[SYSTEMD_JOURNAL_SAMPLING_SLOTS];
+ uint32_t unsampled[SYSTEMD_JOURNAL_SAMPLING_SLOTS];
+ } samples_per_time_slot;
+
// per file progress info
- size_t cached_count;
+ // size_t cached_count;
// progress statistics
usec_t matches_setup_ut;
@@ -315,20 +259,6 @@ typedef struct function_query_status {
size_t file_working;
} FUNCTION_QUERY_STATUS;
-struct journal_file {
- const char *filename;
- size_t filename_len;
- STRING *source;
- SD_JOURNAL_FILE_SOURCE_TYPE source_type;
- usec_t file_last_modified_ut;
- usec_t msg_first_ut;
- usec_t msg_last_ut;
- usec_t last_scan_ut;
- size_t size;
- bool logged_failure;
- usec_t max_journal_vs_realtime_delta_ut;
-};
-
static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) {
netdata_log_error("ERROR: %s, on query "
"timeframe [%"PRIu64" - %"PRIu64"], "
@@ -359,25 +289,369 @@ static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timesta
#define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP"
-static inline bool parse_journal_field(const char *data, size_t data_length, const char **key, size_t *key_length, const char **value, size_t *value_length) {
- const char *k = data;
- const char *equal = strchr(k, '=');
- if(unlikely(!equal))
- return false;
+// ----------------------------------------------------------------------------
+// sampling support
+
+static void sampling_query_init(FUNCTION_QUERY_STATUS *fqs, FACETS *facets) {
+ if(!fqs->sampling)
+ return;
- size_t kl = equal - k;
+ if(!fqs->slice) {
+ // the user is doing a full data query
+ // disable sampling
+ fqs->sampling = 0;
+ return;
+ }
- const char *v = ++equal;
- size_t vl = data_length - kl - 1;
+ if(fqs->data_only) {
+ // the user is doing a data query
+ // disable sampling
+ fqs->sampling = 0;
+ return;
+ }
- *key = k;
- *key_length = kl;
- *value = v;
- *value_length = vl;
+ if(!fqs->files_matched) {
+ // no files have been matched
+ // disable sampling
+ fqs->sampling = 0;
+ return;
+ }
- return true;
+ fqs->samples.slots = facets_histogram_slots(facets);
+ if(fqs->samples.slots < 2) fqs->samples.slots = 2;
+ if(fqs->samples.slots > SYSTEMD_JOURNAL_SAMPLING_SLOTS)
+ fqs->samples.slots = SYSTEMD_JOURNAL_SAMPLING_SLOTS;
+
+ if(!fqs->after_ut || !fqs->before_ut || fqs->after_ut >= fqs->before_ut) {
+ // we don't have enough information for sampling
+ fqs->sampling = 0;
+ return;
+ }
+
+ usec_t delta = fqs->before_ut - fqs->after_ut;
+ usec_t step = delta / facets_histogram_slots(facets) - 1;
+ if(step < 1) step = 1;
+
+ fqs->samples_per_time_slot.start_ut = fqs->after_ut;
+ fqs->samples_per_time_slot.end_ut = fqs->before_ut;
+ fqs->samples_per_time_slot.step_ut = step;
+
+ // the minimum number of rows to enable sampling
+ fqs->samples.enable_after_samples = fqs->sampling / 2;
+
+ size_t files_matched = fqs->files_matched;
+ if(!files_matched)
+ files_matched = 1;
+
+ // the minimum number of rows per file to enable sampling
+ fqs->samples_per_file.enable_after_samples = (fqs->sampling / 4) / files_matched;
+ if(fqs->samples_per_file.enable_after_samples < fqs->entries)
+ fqs->samples_per_file.enable_after_samples = fqs->entries;
+
+ // the minimum number of rows per time slot to enable sampling
+ fqs->samples_per_time_slot.enable_after_samples = (fqs->sampling / 4) / fqs->samples.slots;
+ if(fqs->samples_per_time_slot.enable_after_samples < fqs->entries)
+ fqs->samples_per_time_slot.enable_after_samples = fqs->entries;
+}
+
+static void sampling_file_init(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf __maybe_unused) {
+ fqs->samples_per_file.sampled = 0;
+ fqs->samples_per_file.unsampled = 0;
+ fqs->samples_per_file.estimated = 0;
+ fqs->samples_per_file.every = 0;
+ fqs->samples_per_file.skipped = 0;
+ fqs->samples_per_file.recalibrate = 0;
+}
+
+static size_t sampling_file_lines_scanned_so_far(FUNCTION_QUERY_STATUS *fqs) {
+ size_t sampled = fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled;
+ if(!sampled) sampled = 1;
+ return sampled;
+}
+
+static void sampling_running_file_query_overlapping_timeframe_ut(
+ FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction,
+ usec_t msg_ut, usec_t *after_ut, usec_t *before_ut) {
+
+ // find the overlap of the query and file timeframes
+ // taking into account the first message we encountered
+
+ usec_t oldest_ut, newest_ut;
+ if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) {
+ // the first message we know (oldest)
+ oldest_ut = fqs->query_file.first_msg_ut ? fqs->query_file.first_msg_ut : jf->msg_first_ut;
+ if(!oldest_ut) oldest_ut = fqs->query_file.start_ut;
+
+ if(jf->msg_last_ut)
+ newest_ut = MIN(fqs->query_file.stop_ut, jf->msg_last_ut);
+ else if(jf->file_last_modified_ut)
+ newest_ut = MIN(fqs->query_file.stop_ut, jf->file_last_modified_ut);
+ else
+ newest_ut = fqs->query_file.stop_ut;
+
+ if(msg_ut < oldest_ut)
+ oldest_ut = msg_ut - 1;
+ }
+ else /* BACKWARD */ {
+ // the latest message we know (newest)
+ newest_ut = fqs->query_file.first_msg_ut ? fqs->query_file.first_msg_ut : jf->msg_last_ut;
+ if(!newest_ut) newest_ut = fqs->query_file.start_ut;
+
+ if(jf->msg_first_ut)
+ oldest_ut = MAX(fqs->query_file.stop_ut, jf->msg_first_ut);
+ else
+ oldest_ut = fqs->query_file.stop_ut;
+
+ if(newest_ut < msg_ut)
+ newest_ut = msg_ut + 1;
+ }
+
+ *after_ut = oldest_ut;
+ *before_ut = newest_ut;
+}
+
+static double sampling_running_file_query_progress_by_time(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf,
+ FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
+
+ usec_t after_ut, before_ut, elapsed_ut;
+ sampling_running_file_query_overlapping_timeframe_ut(fqs, jf, direction, msg_ut, &after_ut, &before_ut);
+
+ if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
+ elapsed_ut = msg_ut - after_ut;
+ else
+ elapsed_ut = before_ut - msg_ut;
+
+ usec_t total_ut = before_ut - after_ut;
+ double progress = (double)elapsed_ut / (double)total_ut;
+
+ return progress;
+}
+
+static usec_t sampling_running_file_query_remaining_time(FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf,
+ FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut,
+ usec_t *total_time_ut, usec_t *remaining_start_ut,
+ usec_t *remaining_end_ut) {
+ usec_t after_ut, before_ut;
+ sampling_running_file_query_overlapping_timeframe_ut(fqs, jf, direction, msg_ut, &after_ut, &before_ut);
+
+ // since we have a timestamp in msg_ut
+ // this timestamp can extend the overlap
+ if(msg_ut <= after_ut)
+ after_ut = msg_ut - 1;
+
+ if(msg_ut >= before_ut)
+ before_ut = msg_ut + 1;
+
+ // return the remaining duration
+ usec_t remaining_from_ut, remaining_to_ut;
+ if(direction == FACETS_ANCHOR_DIRECTION_FORWARD) {
+ remaining_from_ut = msg_ut;
+ remaining_to_ut = before_ut;
+ }
+ else {
+ remaining_from_ut = after_ut;
+ remaining_to_ut = msg_ut;
+ }
+
+ usec_t remaining_ut = remaining_to_ut - remaining_from_ut;
+
+ if(total_time_ut)
+ *total_time_ut = (before_ut > after_ut) ? before_ut - after_ut : 1;
+
+ if(remaining_start_ut)
+ *remaining_start_ut = remaining_from_ut;
+
+ if(remaining_end_ut)
+ *remaining_end_ut = remaining_to_ut;
+
+ return remaining_ut;
+}
+
+static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCTION_QUERY_STATUS *fqs,
+ struct journal_file *jf,
+ FACETS_ANCHOR_DIRECTION direction,
+ usec_t msg_ut) {
+ size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs);
+
+ // Calculate the proportion of time covered
+ usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
+ usec_t remaining_time_ut = sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut,
+ &remaining_start_ut, &remaining_end_ut);
+ if (total_time_ut == 0) total_time_ut = 1;
+
+ double proportion_by_time = (double) (total_time_ut - remaining_time_ut) / (double) total_time_ut;
+
+ if (proportion_by_time == 0 || proportion_by_time > 1.0 || !isfinite(proportion_by_time))
+ proportion_by_time = 1.0;
+
+ // Estimate the total number of lines in the file
+ size_t expected_matching_logs_by_time = (size_t)((double)scanned_lines / proportion_by_time);
+
+ if(jf->messages_in_file && expected_matching_logs_by_time > jf->messages_in_file)
+ expected_matching_logs_by_time = jf->messages_in_file;
+
+ // Calculate the estimated number of remaining lines
+ size_t remaining_logs_by_time = expected_matching_logs_by_time - scanned_lines;
+ if (remaining_logs_by_time < 1) remaining_logs_by_time = 1;
+
+// nd_log(NDLS_COLLECTORS, NDLP_INFO,
+// "JOURNAL ESTIMATION: '%s' "
+// "scanned_lines=%zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
+// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
+// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
+// "first message read from the file at %"PRIu64", current message at %"PRIu64", "
+// "proportion of time %.2f %%, "
+// "expected total lines in file %zu, "
+// "remaining lines %zu, "
+// "remaining time %"PRIu64" [%"PRIu64" - %"PRIu64", duration %"PRId64"]"
+// , jf->filename
+// , scanned_lines, fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
+// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
+// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
+// , fqs->query_file.first_msg_ut, msg_ut
+// , proportion_by_time * 100.0
+// , expected_matching_logs_by_time
+// , remaining_logs_by_time
+// , remaining_time_ut, remaining_start_ut, remaining_end_ut, remaining_end_ut - remaining_start_ut
+// );
+
+ return remaining_logs_by_time;
+}
+
+static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
+ size_t expected_matching_logs_by_seqnum = 0;
+ double proportion_by_seqnum = 0.0;
+ size_t remaining_logs_by_seqnum = 0;
+
+#ifdef HAVE_SD_JOURNAL_GET_SEQNUM
+ uint64_t current_msg_seqnum;
+ sd_id128_t current_msg_writer;
+ if(!fqs->query_file.first_msg_seqnum || sd_journal_get_seqnum(j, &current_msg_seqnum, &current_msg_writer) < 0) {
+ fqs->query_file.first_msg_seqnum = 0;
+ fqs->query_file.first_msg_writer = SD_ID128_NULL;
+ }
+ else if(jf->messages_in_file) {
+ size_t scanned_lines = sampling_file_lines_scanned_so_far(fqs);
+
+ double proportion_of_all_lines_so_far;
+ if(direction == FACETS_ANCHOR_DIRECTION_FORWARD)
+ proportion_of_all_lines_so_far = (double)scanned_lines / (double)(current_msg_seqnum - jf->first_seqnum);
+ else
+ proportion_of_all_lines_so_far = (double)scanned_lines / (double)(jf->last_seqnum - current_msg_seqnum);
+
+ if(proportion_of_all_lines_so_far > 1.0)
+ proportion_of_all_lines_so_far = 1.0;
+
+ expected_matching_logs_by_seqnum = (size_t)(proportion_of_all_lines_so_far * (double)jf->messages_in_file);
+
+ proportion_by_seqnum = (double)scanned_lines / (double)expected_matching_logs_by_seqnum;
+
+ if (proportion_by_seqnum == 0 || proportion_by_seqnum > 1.0 || !isfinite(proportion_by_seqnum))
+ proportion_by_seqnum = 1.0;
+
+ remaining_logs_by_seqnum = expected_matching_logs_by_seqnum - scanned_lines;
+ if(!remaining_logs_by_seqnum) remaining_logs_by_seqnum = 1;
+ }
+#endif
+
+ if(remaining_logs_by_seqnum)
+ return remaining_logs_by_seqnum;
+
+ return sampling_running_file_query_estimate_remaining_lines_by_time(fqs, jf, direction, msg_ut);
+}
+
+static void sampling_decide_file_sampling_every(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
+ size_t files_matched = fqs->files_matched;
+ if(!files_matched) files_matched = 1;
+
+ size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, fqs, jf, direction, msg_ut);
+ size_t wanted_samples = (fqs->sampling / 2) / files_matched;
+ if(!wanted_samples) wanted_samples = 1;
+
+ fqs->samples_per_file.every = remaining_lines / wanted_samples;
+
+ if(fqs->samples_per_file.every < 1)
+ fqs->samples_per_file.every = 1;
+}
+
+typedef enum {
+ SAMPLING_STOP_AND_ESTIMATE = -1,
+ SAMPLING_FULL = 0,
+ SAMPLING_SKIP_FIELDS = 1,
+} sampling_t;
+
+static inline sampling_t is_row_in_sample(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction, bool candidate_to_keep) {
+ if(!fqs->sampling || candidate_to_keep)
+ return SAMPLING_FULL;
+
+ if(unlikely(msg_ut < fqs->samples_per_time_slot.start_ut))
+ msg_ut = fqs->samples_per_time_slot.start_ut;
+ if(unlikely(msg_ut > fqs->samples_per_time_slot.end_ut))
+ msg_ut = fqs->samples_per_time_slot.end_ut;
+
+ size_t slot = (msg_ut - fqs->samples_per_time_slot.start_ut) / fqs->samples_per_time_slot.step_ut;
+ if(slot >= fqs->samples.slots)
+ slot = fqs->samples.slots - 1;
+
+ bool should_sample = false;
+
+ if(fqs->samples.sampled < fqs->samples.enable_after_samples ||
+ fqs->samples_per_file.sampled < fqs->samples_per_file.enable_after_samples ||
+ fqs->samples_per_time_slot.sampled[slot] < fqs->samples_per_time_slot.enable_after_samples)
+ should_sample = true;
+
+ else if(fqs->samples_per_file.recalibrate >= SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE || !fqs->samples_per_file.every) {
+ // this is the first to be unsampled for this file
+ sampling_decide_file_sampling_every(j, fqs, jf, direction, msg_ut);
+ fqs->samples_per_file.recalibrate = 0;
+ should_sample = true;
+ }
+ else {
+ // we sample 1 every fqs->samples_per_file.every
+ if(fqs->samples_per_file.skipped >= fqs->samples_per_file.every) {
+ fqs->samples_per_file.skipped = 0;
+ should_sample = true;
+ }
+ else
+ fqs->samples_per_file.skipped++;
+ }
+
+ if(should_sample) {
+ fqs->samples.sampled++;
+ fqs->samples_per_file.sampled++;
+ fqs->samples_per_time_slot.sampled[slot]++;
+
+ return SAMPLING_FULL;
+ }
+
+ fqs->samples_per_file.recalibrate++;
+
+ fqs->samples.unsampled++;
+ fqs->samples_per_file.unsampled++;
+ fqs->samples_per_time_slot.unsampled[slot]++;
+
+ if(fqs->samples_per_file.unsampled > fqs->samples_per_file.sampled) {
+ double progress_by_time = sampling_running_file_query_progress_by_time(fqs, jf, direction, msg_ut);
+
+ if(progress_by_time > SYSTEMD_JOURNAL_ENABLE_ESTIMATIONS_FILE_PERCENTAGE)
+ return SAMPLING_STOP_AND_ESTIMATE;
+ }
+
+ return SAMPLING_SKIP_FIELDS;
+}
+
+static void sampling_update_running_query_file_estimates(FACETS *facets, sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, usec_t msg_ut, FACETS_ANCHOR_DIRECTION direction) {
+ usec_t total_time_ut, remaining_start_ut, remaining_end_ut;
+ sampling_running_file_query_remaining_time(fqs, jf, direction, msg_ut, &total_time_ut, &remaining_start_ut,
+ &remaining_end_ut);
+ size_t remaining_lines = sampling_running_file_query_estimate_remaining_lines(j, fqs, jf, direction, msg_ut);
+ facets_update_estimations(facets, remaining_start_ut, remaining_end_ut, remaining_lines);
+ fqs->samples.estimated += remaining_lines;
+ fqs->samples_per_file.estimated += remaining_lines;
}
+// ----------------------------------------------------------------------------
+
static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) {
const void *data;
size_t length, bytes = 0;
@@ -454,11 +728,15 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut;
bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
+ fqs->query_file.start_ut = start_ut;
+ fqs->query_file.stop_ut = stop_ut;
+
if(!netdata_systemd_journal_seek_to(j, start_ut))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
- usec_t earliest_msg_ut = 0;
+ usec_t latest_msg_ut = 0; // the biggest timestamp we have seen so far
+ usec_t first_msg_ut = 0; // the first message we got from the db
size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
size_t bytes = 0, last_bytes = 0;
@@ -475,44 +753,68 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
continue;
}
- if(unlikely(msg_ut > earliest_msg_ut))
- earliest_msg_ut = msg_ut;
-
if (unlikely(msg_ut > start_ut))
continue;
if (unlikely(msg_ut < stop_ut))
break;
- bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
+ if(unlikely(msg_ut > latest_msg_ut))
+ latest_msg_ut = msg_ut;
- // make sure each line gets a unique timestamp
- if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
- msg_ut = --last_usec_from;
- else
- last_usec_from = last_usec_to = msg_ut;
-
- if(facets_row_finished(facets, msg_ut))
- rows_useful++;
-
- row_counter++;
- if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
- stop_when_full &&
- facets_rows(facets) >= fqs->entries)) {
- // stop the data only query
- usec_t oldest = facets_row_oldest_ut(facets);
- if(oldest && msg_ut < (oldest - anchor_delta))
- break;
+ if(unlikely(!first_msg_ut)) {
+ first_msg_ut = msg_ut;
+ fqs->query_file.first_msg_ut = msg_ut;
+
+#ifdef HAVE_SD_JOURNAL_GET_SEQNUM
+ if(sd_journal_get_seqnum(j, &fqs->query_file.first_msg_seqnum, &fqs->query_file.first_msg_writer) < 0) {
+ fqs->query_file.first_msg_seqnum = 0;
+ fqs->query_file.first_msg_writer = SD_ID128_NULL;
+ }
+#endif
}
- if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
- FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
- last_row_counter = row_counter;
+ sampling_t sample = is_row_in_sample(j, fqs, jf, msg_ut,
+ FACETS_ANCHOR_DIRECTION_BACKWARD,
+ facets_row_candidate_to_keep(facets, msg_ut));
+
+ if(sample == SAMPLING_FULL) {
+ bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
+
+ // make sure each line gets a unique timestamp
+ if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
+ msg_ut = --last_usec_from;
+ else
+ last_usec_from = last_usec_to = msg_ut;
+
+ if(facets_row_finished(facets, msg_ut))
+ rows_useful++;
+
+ row_counter++;
+ if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
+ stop_when_full &&
+ facets_rows(facets) >= fqs->entries)) {
+ // stop the data only query
+ usec_t oldest = facets_row_oldest_ut(facets);
+ if(oldest && msg_ut < (oldest - anchor_delta))
+ break;
+ }
+
+ if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
+ FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
+ last_row_counter = row_counter;
- FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
- last_bytes = bytes;
+ FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
+ last_bytes = bytes;
- status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ }
+ }
+ else if(sample == SAMPLING_SKIP_FIELDS)
+ facets_row_finished_unsampled(facets, msg_ut);
+ else {
+ sampling_update_running_query_file_estimates(facets, j, fqs, jf, msg_ut, FACETS_ANCHOR_DIRECTION_BACKWARD);
+ break;
}
}
@@ -524,8 +826,8 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
if(errors_no_timestamp)
netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
- if(earliest_msg_ut > fqs->last_modified)
- fqs->last_modified = earliest_msg_ut;
+ if(latest_msg_ut > fqs->last_modified)
+ fqs->last_modified = latest_msg_ut;
return status;
}
@@ -540,11 +842,15 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta;
bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
+ fqs->query_file.start_ut = start_ut;
+ fqs->query_file.stop_ut = stop_ut;
+
if(!netdata_systemd_journal_seek_to(j, start_ut))
return ND_SD_JOURNAL_FAILED_TO_SEEK;
size_t errors_no_timestamp = 0;
- usec_t earliest_msg_ut = 0;
+ usec_t latest_msg_ut = 0; // the biggest timestamp we have seen so far
+ usec_t first_msg_ut = 0; // the first message we got from the db
size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
size_t bytes = 0, last_bytes = 0;
@@ -561,44 +867,61 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
continue;
}
- if(likely(msg_ut > earliest_msg_ut))
- earliest_msg_ut = msg_ut;
-
if (unlikely(msg_ut < start_ut))
continue;
if (unlikely(msg_ut > stop_ut))
break;
- bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
+ if(likely(msg_ut > latest_msg_ut))
+ latest_msg_ut = msg_ut;
- // make sure each line gets a unique timestamp
- if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
- msg_ut = ++last_usec_to;
- else
- last_usec_from = last_usec_to = msg_ut;
-
- if(facets_row_finished(facets, msg_ut))
- rows_useful++;
-
- row_counter++;
- if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
- stop_when_full &&
- facets_rows(facets) >= fqs->entries)) {
- // stop the data only query
- usec_t newest = facets_row_newest_ut(facets);
- if(newest && msg_ut > (newest + anchor_delta))
- break;
+ if(unlikely(!first_msg_ut)) {
+ first_msg_ut = msg_ut;
+ fqs->query_file.first_msg_ut = msg_ut;
}
- if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
- FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
- last_row_counter = row_counter;
+ sampling_t sample = is_row_in_sample(j, fqs, jf, msg_ut,
+ FACETS_ANCHOR_DIRECTION_FORWARD,
+ facets_row_candidate_to_keep(facets, msg_ut));
+
+ if(sample == SAMPLING_FULL) {
+ bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
- FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
- last_bytes = bytes;
+ // make sure each line gets a unique timestamp
+ if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
+ msg_ut = ++last_usec_to;
+ else
+ last_usec_from = last_usec_to = msg_ut;
+
+ if(facets_row_finished(facets, msg_ut))
+ rows_useful++;
+
+ row_counter++;
+ if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
+ stop_when_full &&
+ facets_rows(facets) >= fqs->entries)) {
+ // stop the data only query
+ usec_t newest = facets_row_newest_ut(facets);
+ if(newest && msg_ut > (newest + anchor_delta))
+ break;
+ }
+
+ if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
+ FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
+ last_row_counter = row_counter;
+
+ FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
+ last_bytes = bytes;
- status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ }
+ }
+ else if(sample == SAMPLING_SKIP_FIELDS)
+ facets_row_finished_unsampled(facets, msg_ut);
+ else {
+ sampling_update_running_query_file_estimates(facets, j, fqs, jf, msg_ut, FACETS_ANCHOR_DIRECTION_FORWARD);
+ break;
}
}
@@ -610,8 +933,8 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
if(errors_no_timestamp)
netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
- if(earliest_msg_ut > fqs->last_modified)
- fqs->last_modified = earliest_msg_ut;
+ if(latest_msg_ut > fqs->last_modified)
+ fqs->last_modified = latest_msg_ut;
return status;
}
@@ -723,6 +1046,7 @@ static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file(
};
if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
+ netdata_log_error("JOURNAL: cannot open file '%s' for query", filename);
fstat_cache_disable_on_thread();
return ND_SD_JOURNAL_FAILED_TO_OPEN;
}
@@ -756,432 +1080,18 @@ static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file(
return status;
}
-// ----------------------------------------------------------------------------
-// journal files registry
-
-#define VAR_LOG_JOURNAL_MAX_DEPTH 10
-#define MAX_JOURNAL_DIRECTORIES 100
-
-struct journal_directory {
- char *path;
- bool logged_failure;
-};
-
-static struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES] = { 0 };
-static DICTIONARY *journal_files_registry = NULL;
-static DICTIONARY *used_hashes_registry = NULL;
-
-static usec_t systemd_journal_session = 0;
-
-static void buffer_json_journal_versions(BUFFER *wb) {
- buffer_json_member_add_object(wb, "versions");
- {
- buffer_json_member_add_uint64(wb, "sources",
- systemd_journal_session + dictionary_version(journal_files_registry));
- }
- buffer_json_object_close(wb);
-}
-
-static void journal_file_update_msg_ut(const char *filename, struct journal_file *jf) {
- fstat_cache_enable_on_thread();
-
- const char *files[2] = {
- [0] = filename,
- [1] = NULL,
- };
-
- sd_journal *j = NULL;
- if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
- fstat_cache_disable_on_thread();
-
- if(!jf->logged_failure) {
- netdata_log_error("cannot open journal file '%s', using file timestamps to understand time-frame.", filename);
- jf->logged_failure = true;
- }
-
- jf->msg_first_ut = 0;
- jf->msg_last_ut = jf->file_last_modified_ut;
- return;
- }
-
- usec_t first_ut = 0, last_ut = 0;
-
- if(sd_journal_seek_head(j) < 0 || sd_journal_next(j) < 0 || sd_journal_get_realtime_usec(j, &first_ut) < 0 || !first_ut) {
- internal_error(true, "cannot find the timestamp of the first message in '%s'", filename);
- first_ut = 0;
- }
-
- if(sd_journal_seek_tail(j) < 0 || sd_journal_previous(j) < 0 || sd_journal_get_realtime_usec(j, &last_ut) < 0 || !last_ut) {
- internal_error(true, "cannot find the timestamp of the last message in '%s'", filename);
- last_ut = jf->file_last_modified_ut;
- }
-
- sd_journal_close(j);
- fstat_cache_disable_on_thread();
-
- if(first_ut > last_ut) {
- internal_error(true, "timestamps are flipped in file '%s'", filename);
- usec_t t = first_ut;
- first_ut = last_ut;
- last_ut = t;
- }
-
- jf->msg_first_ut = first_ut;
- jf->msg_last_ut = last_ut;
-}
-
-static STRING *string_strdupz_source(const char *s, const char *e, size_t max_len, const char *prefix) {
- char buf[max_len];
- size_t len;
- char *dst = buf;
-
- if(prefix) {
- len = strlen(prefix);
- memcpy(buf, prefix, len);
- dst = &buf[len];
- max_len -= len;
- }
-
- len = e - s;
- if(len >= max_len)
- len = max_len - 1;
- memcpy(dst, s, len);
- dst[len] = '\0';
- buf[max_len - 1] = '\0';
-
- for(size_t i = 0; buf[i] ;i++)
- if(!isalnum(buf[i]) && buf[i] != '-' && buf[i] != '.' && buf[i] != ':')
- buf[i] = '_';
-
- return string_strdupz(buf);
-}
-
-static void files_registry_insert_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) {
- struct journal_file *jf = value;
- jf->filename = dictionary_acquired_item_name(item);
- jf->filename_len = strlen(jf->filename);
-
- // based on the filename
- // decide the source to show to the user
- const char *s = strrchr(jf->filename, '/');
- if(s) {
- if(strstr(jf->filename, "/remote/"))
- jf->source_type = SDJF_REMOTE;
- else {
- const char *t = s - 1;
- while(t >= jf->filename && *t != '.' && *t != '/')
- t--;
-
- if(t >= jf->filename && *t == '.') {
- jf->source_type = SDJF_NAMESPACE;
- jf->source = string_strdupz_source(t + 1, s, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "namespace-");
- }
- else
- jf->source_type = SDJF_LOCAL;
- }
-
- if(strncmp(s, "/system", 7) == 0)
- jf->source_type |= SDJF_SYSTEM;
-
- else if(strncmp(s, "/user", 5) == 0)
- jf->source_type |= SDJF_USER;
-
- else if(strncmp(s, "/remote-", 8) == 0) {
- jf->source_type |= SDJF_REMOTE;
-
- s = &s[8]; // skip "/remote-"
-
- char *e = strchr(s, '@');
- if(!e)
- e = strstr(s, ".journal");
-
- if(e) {
- const char *d = s;
- for(; d < e && (isdigit(*d) || *d == '.' || *d == ':') ; d++) ;
- if(d == e) {
- // a valid IP address
- char ip[e - s + 1];
- memcpy(ip, s, e - s);
- ip[e - s] = '\0';
- char buf[SYSTEMD_JOURNAL_MAX_SOURCE_LEN];
- if(ip_to_hostname(ip, buf, sizeof(buf)))
- jf->source = string_strdupz_source(buf, &buf[strlen(buf)], SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
- else {
- internal_error(true, "Cannot find the hostname for IP '%s'", ip);
- jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
- }
- }
- else
- jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
- }
- else
- jf->source_type |= SDJF_OTHER;
- }
- else
- jf->source_type |= SDJF_OTHER;
- }
- else
- jf->source_type = SDJF_LOCAL | SDJF_OTHER;
-
- journal_file_update_msg_ut(jf->filename, jf);
-
- internal_error(true,
- "found journal file '%s', type %d, source '%s', "
- "file modified: %"PRIu64", "
- "msg {first: %"PRIu64", last: %"PRIu64"}",
- jf->filename, jf->source_type, jf->source ? string2str(jf->source) : "<unset>",
- jf->file_last_modified_ut,
- jf->msg_first_ut, jf->msg_last_ut);
-}
-
-static bool files_registry_conflict_cb(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *data __maybe_unused) {
- struct journal_file *jf = old_value;
- struct journal_file *njf = new_value;
-
- if(njf->last_scan_ut > jf->last_scan_ut)
- jf->last_scan_ut = njf->last_scan_ut;
-
- if(njf->file_last_modified_ut > jf->file_last_modified_ut) {
- jf->file_last_modified_ut = njf->file_last_modified_ut;
- jf->size = njf->size;
-
- const char *filename = dictionary_acquired_item_name(item);
- journal_file_update_msg_ut(filename, jf);
-
-// internal_error(true,
-// "updated journal file '%s', type %d, "
-// "file modified: %"PRIu64", "
-// "msg {first: %"PRIu64", last: %"PRIu64"}",
-// filename, jf->source_type,
-// jf->file_last_modified_ut,
-// jf->msg_first_ut, jf->msg_last_ut);
- }
-
- return false;
-}
-
-#define SDJF_SOURCE_ALL_NAME "all"
-#define SDJF_SOURCE_LOCAL_NAME "all-local-logs"
-#define SDJF_SOURCE_LOCAL_SYSTEM_NAME "all-local-system-logs"
-#define SDJF_SOURCE_LOCAL_USERS_NAME "all-local-user-logs"
-#define SDJF_SOURCE_LOCAL_OTHER_NAME "all-uncategorized"
-#define SDJF_SOURCE_NAMESPACES_NAME "all-local-namespaces"
-#define SDJF_SOURCE_REMOTES_NAME "all-remote-systems"
-
-struct journal_file_source {
- usec_t first_ut;
- usec_t last_ut;
- size_t count;
- uint64_t size;
-};
-
-static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) {
- if(size > 1024ULL * 1024 * 1024 * 1024)
- snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0);
- else if(size > 1024ULL * 1024 * 1024)
- snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0);
- else if(size > 1024ULL * 1024)
- snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0);
- else if(size > 1024ULL)
- snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0);
- else
- snprintfz(dst, dst_len, "%"PRIu64" B", size);
-}
-
-#define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \
- if((remaining) > (duration)) { \
- uint64_t _count = (remaining) / (duration); \
- uint64_t _rem = (remaining) - (_count * (duration)); \
- (pos) += snprintfz(&(dst)[pos], (dst_len) - (pos), "%s%s%"PRIu64" %s", (printed) ? ", " : "", _rem ? "" : "and ", _count, _count > 1 ? (many) : (one)); \
- (remaining) = _rem; \
- (printed) = true; \
- } \
-} while(0)
-
-static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) {
- if(duration_s < 0)
- duration_s = -duration_s;
-
- size_t pos = 0;
- dst[0] = 0 ;
-
- bool printed = false;
- print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed);
- print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed);
- print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed);
- print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed);
- print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed);
- print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed);
-}
-
-static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) {
- struct journal_file_source *jfs = entry;
- BUFFER *wb = data;
-
- const char *name = dictionary_acquired_item_name(item);
-
- buffer_json_add_array_item_object(wb);
- {
- char size_for_humans[100];
- human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans));
-
- char duration_for_humans[1024];
- human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC),
- duration_for_humans, sizeof(duration_for_humans));
-
- char info[1024];
- snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s",
- jfs->count, size_for_humans, duration_for_humans);
-
- buffer_json_member_add_string(wb, "id", name);
- buffer_json_member_add_string(wb, "name", name);
- buffer_json_member_add_string(wb, "pill", size_for_humans);
- buffer_json_member_add_string(wb, "info", info);
- }
- buffer_json_object_close(wb); // options object
-
- return 1;
-}
-
-static bool journal_file_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) {
- struct journal_file_source *jfs = old_value, *njfs = new_value;
- jfs->count += njfs->count;
- jfs->size += njfs->size;
-
- if(njfs->first_ut && njfs->first_ut < jfs->first_ut)
- jfs->first_ut = njfs->first_ut;
-
- if(njfs->last_ut && njfs->last_ut > jfs->last_ut)
- jfs->last_ut = njfs->last_ut;
-
- return false;
-}
-
-static void available_journal_file_sources_to_json_array(BUFFER *wb) {
- DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE);
- dictionary_register_conflict_callback(dict, journal_file_merge_sizes, NULL);
-
- struct journal_file_source t = { 0 };
-
- struct journal_file *jf;
- dfe_start_read(journal_files_registry, jf) {
- t.first_ut = jf->msg_first_ut;
- t.last_ut = jf->msg_last_ut;
- t.count = 1;
- t.size = jf->size;
-
- dictionary_set(dict, SDJF_SOURCE_ALL_NAME, &t, sizeof(t));
-
- if((jf->source_type & (SDJF_LOCAL)) == (SDJF_LOCAL))
- dictionary_set(dict, SDJF_SOURCE_LOCAL_NAME, &t, sizeof(t));
- if((jf->source_type & (SDJF_LOCAL | SDJF_SYSTEM)) == (SDJF_LOCAL | SDJF_SYSTEM))
- dictionary_set(dict, SDJF_SOURCE_LOCAL_SYSTEM_NAME, &t, sizeof(t));
- if((jf->source_type & (SDJF_LOCAL | SDJF_USER)) == (SDJF_LOCAL | SDJF_USER))
- dictionary_set(dict, SDJF_SOURCE_LOCAL_USERS_NAME, &t, sizeof(t));
- if((jf->source_type & (SDJF_LOCAL | SDJF_OTHER)) == (SDJF_LOCAL | SDJF_OTHER))
- dictionary_set(dict, SDJF_SOURCE_LOCAL_OTHER_NAME, &t, sizeof(t));
- if((jf->source_type & (SDJF_NAMESPACE)) == (SDJF_NAMESPACE))
- dictionary_set(dict, SDJF_SOURCE_NAMESPACES_NAME, &t, sizeof(t));
- if((jf->source_type & (SDJF_REMOTE)) == (SDJF_REMOTE))
- dictionary_set(dict, SDJF_SOURCE_REMOTES_NAME, &t, sizeof(t));
- if(jf->source)
- dictionary_set(dict, string2str(jf->source), &t, sizeof(t));
- }
- dfe_done(jf);
-
- dictionary_sorted_walkthrough_read(dict, journal_file_to_json_array_cb, wb);
-
- dictionary_destroy(dict);
-}
-
-static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) {
- struct journal_file *jf = value; (void)jf;
- const char *filename = dictionary_acquired_item_name(item); (void)filename;
-
- string_freez(jf->source);
- internal_error(true, "removed journal file '%s'", filename);
-}
-
-void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_ut) {
- static const char *ext = ".journal";
- static const size_t ext_len = sizeof(".journal") - 1;
-
- if (depth > VAR_LOG_JOURNAL_MAX_DEPTH)
- return;
-
- DIR *dir;
- struct dirent *entry;
- struct stat info;
- char absolute_path[FILENAME_MAX];
-
- // Open the directory.
- if ((dir = opendir(dirname)) == NULL) {
- if(errno != ENOENT && errno != ENOTDIR)
- netdata_log_error("Cannot opendir() '%s'", dirname);
- return;
- }
-
- // Read each entry in the directory.
- while ((entry = readdir(dir)) != NULL) {
- snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name);
- if (stat(absolute_path, &info) != 0) {
- netdata_log_error("Failed to stat() '%s", absolute_path);
- continue;
- }
-
- if (S_ISDIR(info.st_mode)) {
- // If entry is a directory, call traverse recursively.
- if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0)
- journal_directory_scan(absolute_path, depth + 1, last_scan_ut);
-
- }
- else if (S_ISREG(info.st_mode)) {
- // If entry is a regular file, check if it ends with .journal.
- char *filename = entry->d_name;
- size_t len = strlen(filename);
-
- if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) {
- struct journal_file t = {
- .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC,
- .last_scan_ut = last_scan_ut,
- .size = info.st_size,
- .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
- };
- dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t));
- }
- }
- }
-
- closedir(dir);
-}
-
-static void journal_files_registry_update() {
- usec_t scan_ut = now_monotonic_usec();
-
- for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES ;i++) {
- if(!journal_directories[i].path)
- break;
-
- journal_directory_scan(journal_directories[i].path, 0, scan_ut);
- }
-
- struct journal_file *jf;
- dfe_start_write(journal_files_registry, jf) {
- if(jf->last_scan_ut < scan_ut)
- dictionary_del(journal_files_registry, jf_dfe.name);
- }
- dfe_done(jf);
-}
-
-// ----------------------------------------------------------------------------
-
static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
- if((fqs->source_type == SDJF_ALL || (jf->source_type & fqs->source_type) == fqs->source_type) &&
- (!fqs->source || fqs->source == jf->source)) {
+ if((fqs->source_type == SDJF_NONE && !fqs->sources) || (jf->source_type & fqs->source_type) ||
+ (fqs->sources && simple_pattern_matches(fqs->sources, string2str(jf->source)))) {
+
+ if(!jf->msg_last_ut || !jf->msg_last_ut)
+ // the file is not scanned yet, or the timestamps have not been updated,
+ // so we don't know if it can contribute or not - let's add it.
+ return true;
usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
- usec_t first_ut = jf->msg_first_ut;
+ usec_t first_ut = jf->msg_first_ut - anchor_delta;
usec_t last_ut = jf->msg_last_ut + anchor_delta;
if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut)
@@ -1191,30 +1101,6 @@ static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
return false;
}
-static int journal_file_dict_items_backward_compar(const void *a, const void *b) {
- const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b;
- struct journal_file *jfa = dictionary_acquired_item_value(*ad);
- struct journal_file *jfb = dictionary_acquired_item_value(*bd);
-
- if(jfa->msg_last_ut < jfb->msg_last_ut)
- return 1;
-
- if(jfa->msg_last_ut > jfb->msg_last_ut)
- return -1;
-
- if(jfa->msg_first_ut < jfb->msg_first_ut)
- return 1;
-
- if(jfa->msg_first_ut > jfb->msg_first_ut)
- return -1;
-
- return 0;
-}
-
-static int journal_file_dict_items_forward_compar(const void *a, const void *b) {
- return -journal_file_dict_items_backward_compar(a, b);
-}
-
static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_NO_FILE_MATCHED;
struct journal_file *jf;
@@ -1260,8 +1146,12 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
}
bool partial = false;
- usec_t started_ut;
- usec_t ended_ut = now_monotonic_usec();
+ usec_t query_started_ut = now_monotonic_usec();
+ usec_t started_ut = query_started_ut;
+ usec_t ended_ut = started_ut;
+ usec_t duration_ut = 0, max_duration_ut = 0;
+
+ sampling_query_init(fqs, facets);
buffer_json_member_add_array(wb, "_journal_files");
for(size_t f = 0; f < files_used ;f++) {
@@ -1271,8 +1161,19 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
if(!jf_is_mine(jf, fqs))
continue;
+ started_ut = ended_ut;
+
+ // do not even try to do the query if we expect it to pass the timeout
+ if(ended_ut > (query_started_ut + (fqs->stop_monotonic_ut - query_started_ut) * 3 / 4) &&
+ ended_ut + max_duration_ut * 2 >= fqs->stop_monotonic_ut) {
+
+ partial = true;
+ status = ND_SD_JOURNAL_TIMED_OUT;
+ break;
+ }
+
fqs->file_working++;
- fqs->cached_count = 0;
+ // fqs->cached_count = 0;
size_t fs_calls = fstat_thread_calls;
size_t fs_cached = fstat_thread_cached_responses;
@@ -1281,8 +1182,22 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
size_t bytes_read = fqs->bytes_read;
size_t matches_setup_ut = fqs->matches_setup_ut;
+ sampling_file_init(fqs, jf);
+
ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs);
+// nd_log(NDLS_COLLECTORS, NDLP_INFO,
+// "JOURNAL ESTIMATION FINAL: '%s' "
+// "total lines %zu [sampled=%zu, unsampled=%zu, estimated=%zu], "
+// "file [%"PRIu64" - %"PRIu64", duration %"PRId64", known lines in file %zu], "
+// "query [%"PRIu64" - %"PRIu64", duration %"PRId64"], "
+// , jf->filename
+// , fqs->samples_per_file.sampled + fqs->samples_per_file.unsampled + fqs->samples_per_file.estimated
+// , fqs->samples_per_file.sampled, fqs->samples_per_file.unsampled, fqs->samples_per_file.estimated
+// , jf->msg_first_ut, jf->msg_last_ut, jf->msg_last_ut - jf->msg_first_ut, jf->messages_in_file
+// , fqs->query_file.start_ut, fqs->query_file.stop_ut, fqs->query_file.stop_ut - fqs->query_file.start_ut
+// );
+
rows_useful = fqs->rows_useful - rows_useful;
rows_read = fqs->rows_read - rows_read;
bytes_read = fqs->bytes_read - bytes_read;
@@ -1290,9 +1205,11 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
fs_calls = fstat_thread_calls - fs_calls;
fs_cached = fstat_thread_cached_responses - fs_cached;
- started_ut = ended_ut;
ended_ut = now_monotonic_usec();
- usec_t duration_ut = ended_ut - started_ut;
+ duration_ut = ended_ut - started_ut;
+
+ if(duration_ut > max_duration_ut)
+ max_duration_ut = duration_ut;
buffer_json_add_array_item_object(wb); // journal file
{
@@ -1315,6 +1232,16 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut);
buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls);
buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached);
+
+ if(fqs->sampling) {
+ buffer_json_member_add_object(wb, "_sampling");
+ {
+ buffer_json_member_add_uint64(wb, "sampled", fqs->samples_per_file.sampled);
+ buffer_json_member_add_uint64(wb, "unsampled", fqs->samples_per_file.unsampled);
+ buffer_json_member_add_uint64(wb, "estimated", fqs->samples_per_file.estimated);
+ }
+ buffer_json_object_close(wb); // _sampling
+ }
}
buffer_json_object_close(wb); // journal file
@@ -1384,6 +1311,64 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
buffer_json_member_add_boolean(wb, "partial", partial);
buffer_json_member_add_string(wb, "type", "table");
+ // build a message for the query
+ if(!fqs->data_only) {
+ CLEAN_BUFFER *msg = buffer_create(0, NULL);
+ CLEAN_BUFFER *msg_description = buffer_create(0, NULL);
+ ND_LOG_FIELD_PRIORITY msg_priority = NDLP_INFO;
+
+ if(!journal_files_completed_once()) {
+ buffer_strcat(msg, "Journals are still being scanned. ");
+ buffer_strcat(msg_description
+ , "LIBRARY SCAN: The journal files are still being scanned, you are probably viewing incomplete data. ");
+ msg_priority = NDLP_WARNING;
+ }
+
+ if(partial) {
+ buffer_strcat(msg, "Query timed-out, incomplete data. ");
+ buffer_strcat(msg_description
+ , "QUERY TIMEOUT: The query timed out and may not include all the data of the selected window. ");
+ msg_priority = NDLP_WARNING;
+ }
+
+ if(fqs->samples.estimated || fqs->samples.unsampled) {
+ double percent = (double) (fqs->samples.sampled * 100.0 /
+ (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled));
+ buffer_sprintf(msg, "%.2f%% real data", percent);
+ buffer_sprintf(msg_description, "ACTUAL DATA: The filters counters reflect %0.2f%% of the data. ", percent);
+ msg_priority = MIN(msg_priority, NDLP_NOTICE);
+ }
+
+ if(fqs->samples.unsampled) {
+ double percent = (double) (fqs->samples.unsampled * 100.0 /
+ (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled));
+ buffer_sprintf(msg, ", %.2f%% unsampled", percent);
+ buffer_sprintf(msg_description
+ , "UNSAMPLED DATA: %0.2f%% of the events exist and have been counted, but their values have not been evaluated, so they are not included in the filters counters. "
+ , percent);
+ msg_priority = MIN(msg_priority, NDLP_NOTICE);
+ }
+
+ if(fqs->samples.estimated) {
+ double percent = (double) (fqs->samples.estimated * 100.0 /
+ (fqs->samples.estimated + fqs->samples.unsampled + fqs->samples.sampled));
+ buffer_sprintf(msg, ", %.2f%% estimated", percent);
+ buffer_sprintf(msg_description
+ , "ESTIMATED DATA: The query selected a large amount of data, so to avoid delaying too much, the presented data are estimated by %0.2f%%. "
+ , percent);
+ msg_priority = MIN(msg_priority, NDLP_NOTICE);
+ }
+
+ buffer_json_member_add_object(wb, "message");
+ if(buffer_tostring(msg)) {
+ buffer_json_member_add_string(wb, "title", buffer_tostring(msg));
+ buffer_json_member_add_string(wb, "description", buffer_tostring(msg_description));
+ buffer_json_member_add_string(wb, "status", nd_log_id2priority(msg_priority));
+ }
+ // else send an empty object if there is nothing to tell
+ buffer_json_object_close(wb); // message
+ }
+
if(!fqs->data_only) {
buffer_json_member_add_time_t(wb, "update_every", 1);
buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
@@ -1403,6 +1388,17 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses);
}
buffer_json_object_close(wb); // _fstat_caching
+
+ if(fqs->sampling) {
+ buffer_json_member_add_object(wb, "_sampling");
+ {
+ buffer_json_member_add_uint64(wb, "sampled", fqs->samples.sampled);
+ buffer_json_member_add_uint64(wb, "unsampled", fqs->samples.unsampled);
+ buffer_json_member_add_uint64(wb, "estimated", fqs->samples.estimated);
+ }
+ buffer_json_object_close(wb); // _sampling
+ }
+
buffer_json_finalize(wb);
return HTTP_RESP_OK;
@@ -1471,6 +1467,10 @@ static void netdata_systemd_journal_function_help(const char *transaction) {
" The number of items to return.\n"
" The default is %d.\n"
"\n"
+ " "JOURNAL_PARAMETER_SAMPLING":ITEMS\n"
+ " The number of log entries to sample to estimate facets counters and histogram.\n"
+ " The default is %d.\n"
+ "\n"
" "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n"
" Return items relative to this timestamp.\n"
" The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n"
@@ -1511,6 +1511,7 @@ static void netdata_systemd_journal_function_help(const char *transaction) {
, JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice
, -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
, SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
+ , SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING
, JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward"
);
@@ -1521,572 +1522,6 @@ static void netdata_systemd_journal_function_help(const char *transaction) {
buffer_free(wb);
}
-const char *errno_map[] = {
- [1] = "1 (EPERM)", // "Operation not permitted",
- [2] = "2 (ENOENT)", // "No such file or directory",
- [3] = "3 (ESRCH)", // "No such process",
- [4] = "4 (EINTR)", // "Interrupted system call",
- [5] = "5 (EIO)", // "Input/output error",
- [6] = "6 (ENXIO)", // "No such device or address",
- [7] = "7 (E2BIG)", // "Argument list too long",
- [8] = "8 (ENOEXEC)", // "Exec format error",
- [9] = "9 (EBADF)", // "Bad file descriptor",
- [10] = "10 (ECHILD)", // "No child processes",
- [11] = "11 (EAGAIN)", // "Resource temporarily unavailable",
- [12] = "12 (ENOMEM)", // "Cannot allocate memory",
- [13] = "13 (EACCES)", // "Permission denied",
- [14] = "14 (EFAULT)", // "Bad address",
- [15] = "15 (ENOTBLK)", // "Block device required",
- [16] = "16 (EBUSY)", // "Device or resource busy",
- [17] = "17 (EEXIST)", // "File exists",
- [18] = "18 (EXDEV)", // "Invalid cross-device link",
- [19] = "19 (ENODEV)", // "No such device",
- [20] = "20 (ENOTDIR)", // "Not a directory",
- [21] = "21 (EISDIR)", // "Is a directory",
- [22] = "22 (EINVAL)", // "Invalid argument",
- [23] = "23 (ENFILE)", // "Too many open files in system",
- [24] = "24 (EMFILE)", // "Too many open files",
- [25] = "25 (ENOTTY)", // "Inappropriate ioctl for device",
- [26] = "26 (ETXTBSY)", // "Text file busy",
- [27] = "27 (EFBIG)", // "File too large",
- [28] = "28 (ENOSPC)", // "No space left on device",
- [29] = "29 (ESPIPE)", // "Illegal seek",
- [30] = "30 (EROFS)", // "Read-only file system",
- [31] = "31 (EMLINK)", // "Too many links",
- [32] = "32 (EPIPE)", // "Broken pipe",
- [33] = "33 (EDOM)", // "Numerical argument out of domain",
- [34] = "34 (ERANGE)", // "Numerical result out of range",
- [35] = "35 (EDEADLK)", // "Resource deadlock avoided",
- [36] = "36 (ENAMETOOLONG)", // "File name too long",
- [37] = "37 (ENOLCK)", // "No locks available",
- [38] = "38 (ENOSYS)", // "Function not implemented",
- [39] = "39 (ENOTEMPTY)", // "Directory not empty",
- [40] = "40 (ELOOP)", // "Too many levels of symbolic links",
- [42] = "42 (ENOMSG)", // "No message of desired type",
- [43] = "43 (EIDRM)", // "Identifier removed",
- [44] = "44 (ECHRNG)", // "Channel number out of range",
- [45] = "45 (EL2NSYNC)", // "Level 2 not synchronized",
- [46] = "46 (EL3HLT)", // "Level 3 halted",
- [47] = "47 (EL3RST)", // "Level 3 reset",
- [48] = "48 (ELNRNG)", // "Link number out of range",
- [49] = "49 (EUNATCH)", // "Protocol driver not attached",
- [50] = "50 (ENOCSI)", // "No CSI structure available",
- [51] = "51 (EL2HLT)", // "Level 2 halted",
- [52] = "52 (EBADE)", // "Invalid exchange",
- [53] = "53 (EBADR)", // "Invalid request descriptor",
- [54] = "54 (EXFULL)", // "Exchange full",
- [55] = "55 (ENOANO)", // "No anode",
- [56] = "56 (EBADRQC)", // "Invalid request code",
- [57] = "57 (EBADSLT)", // "Invalid slot",
- [59] = "59 (EBFONT)", // "Bad font file format",
- [60] = "60 (ENOSTR)", // "Device not a stream",
- [61] = "61 (ENODATA)", // "No data available",
- [62] = "62 (ETIME)", // "Timer expired",
- [63] = "63 (ENOSR)", // "Out of streams resources",
- [64] = "64 (ENONET)", // "Machine is not on the network",
- [65] = "65 (ENOPKG)", // "Package not installed",
- [66] = "66 (EREMOTE)", // "Object is remote",
- [67] = "67 (ENOLINK)", // "Link has been severed",
- [68] = "68 (EADV)", // "Advertise error",
- [69] = "69 (ESRMNT)", // "Srmount error",
- [70] = "70 (ECOMM)", // "Communication error on send",
- [71] = "71 (EPROTO)", // "Protocol error",
- [72] = "72 (EMULTIHOP)", // "Multihop attempted",
- [73] = "73 (EDOTDOT)", // "RFS specific error",
- [74] = "74 (EBADMSG)", // "Bad message",
- [75] = "75 (EOVERFLOW)", // "Value too large for defined data type",
- [76] = "76 (ENOTUNIQ)", // "Name not unique on network",
- [77] = "77 (EBADFD)", // "File descriptor in bad state",
- [78] = "78 (EREMCHG)", // "Remote address changed",
- [79] = "79 (ELIBACC)", // "Can not access a needed shared library",
- [80] = "80 (ELIBBAD)", // "Accessing a corrupted shared library",
- [81] = "81 (ELIBSCN)", // ".lib section in a.out corrupted",
- [82] = "82 (ELIBMAX)", // "Attempting to link in too many shared libraries",
- [83] = "83 (ELIBEXEC)", // "Cannot exec a shared library directly",
- [84] = "84 (EILSEQ)", // "Invalid or incomplete multibyte or wide character",
- [85] = "85 (ERESTART)", // "Interrupted system call should be restarted",
- [86] = "86 (ESTRPIPE)", // "Streams pipe error",
- [87] = "87 (EUSERS)", // "Too many users",
- [88] = "88 (ENOTSOCK)", // "Socket operation on non-socket",
- [89] = "89 (EDESTADDRREQ)", // "Destination address required",
- [90] = "90 (EMSGSIZE)", // "Message too long",
- [91] = "91 (EPROTOTYPE)", // "Protocol wrong type for socket",
- [92] = "92 (ENOPROTOOPT)", // "Protocol not available",
- [93] = "93 (EPROTONOSUPPORT)", // "Protocol not supported",
- [94] = "94 (ESOCKTNOSUPPORT)", // "Socket type not supported",
- [95] = "95 (ENOTSUP)", // "Operation not supported",
- [96] = "96 (EPFNOSUPPORT)", // "Protocol family not supported",
- [97] = "97 (EAFNOSUPPORT)", // "Address family not supported by protocol",
- [98] = "98 (EADDRINUSE)", // "Address already in use",
- [99] = "99 (EADDRNOTAVAIL)", // "Cannot assign requested address",
- [100] = "100 (ENETDOWN)", // "Network is down",
- [101] = "101 (ENETUNREACH)", // "Network is unreachable",
- [102] = "102 (ENETRESET)", // "Network dropped connection on reset",
- [103] = "103 (ECONNABORTED)", // "Software caused connection abort",
- [104] = "104 (ECONNRESET)", // "Connection reset by peer",
- [105] = "105 (ENOBUFS)", // "No buffer space available",
- [106] = "106 (EISCONN)", // "Transport endpoint is already connected",
- [107] = "107 (ENOTCONN)", // "Transport endpoint is not connected",
- [108] = "108 (ESHUTDOWN)", // "Cannot send after transport endpoint shutdown",
- [109] = "109 (ETOOMANYREFS)", // "Too many references: cannot splice",
- [110] = "110 (ETIMEDOUT)", // "Connection timed out",
- [111] = "111 (ECONNREFUSED)", // "Connection refused",
- [112] = "112 (EHOSTDOWN)", // "Host is down",
- [113] = "113 (EHOSTUNREACH)", // "No route to host",
- [114] = "114 (EALREADY)", // "Operation already in progress",
- [115] = "115 (EINPROGRESS)", // "Operation now in progress",
- [116] = "116 (ESTALE)", // "Stale file handle",
- [117] = "117 (EUCLEAN)", // "Structure needs cleaning",
- [118] = "118 (ENOTNAM)", // "Not a XENIX named type file",
- [119] = "119 (ENAVAIL)", // "No XENIX semaphores available",
- [120] = "120 (EISNAM)", // "Is a named type file",
- [121] = "121 (EREMOTEIO)", // "Remote I/O error",
- [122] = "122 (EDQUOT)", // "Disk quota exceeded",
- [123] = "123 (ENOMEDIUM)", // "No medium found",
- [124] = "124 (EMEDIUMTYPE)", // "Wrong medium type",
- [125] = "125 (ECANCELED)", // "Operation canceled",
- [126] = "126 (ENOKEY)", // "Required key not available",
- [127] = "127 (EKEYEXPIRED)", // "Key has expired",
- [128] = "128 (EKEYREVOKED)", // "Key has been revoked",
- [129] = "129 (EKEYREJECTED)", // "Key was rejected by service",
- [130] = "130 (EOWNERDEAD)", // "Owner died",
- [131] = "131 (ENOTRECOVERABLE)", // "State not recoverable",
- [132] = "132 (ERFKILL)", // "Operation not possible due to RF-kill",
- [133] = "133 (EHWPOISON)", // "Memory page has hardware error",
-};
-
-static const char *syslog_facility_to_name(int facility) {
- switch (facility) {
- case LOG_FAC(LOG_KERN): return "kern";
- case LOG_FAC(LOG_USER): return "user";
- case LOG_FAC(LOG_MAIL): return "mail";
- case LOG_FAC(LOG_DAEMON): return "daemon";
- case LOG_FAC(LOG_AUTH): return "auth";
- case LOG_FAC(LOG_SYSLOG): return "syslog";
- case LOG_FAC(LOG_LPR): return "lpr";
- case LOG_FAC(LOG_NEWS): return "news";
- case LOG_FAC(LOG_UUCP): return "uucp";
- case LOG_FAC(LOG_CRON): return "cron";
- case LOG_FAC(LOG_AUTHPRIV): return "authpriv";
- case LOG_FAC(LOG_FTP): return "ftp";
- case LOG_FAC(LOG_LOCAL0): return "local0";
- case LOG_FAC(LOG_LOCAL1): return "local1";
- case LOG_FAC(LOG_LOCAL2): return "local2";
- case LOG_FAC(LOG_LOCAL3): return "local3";
- case LOG_FAC(LOG_LOCAL4): return "local4";
- case LOG_FAC(LOG_LOCAL5): return "local5";
- case LOG_FAC(LOG_LOCAL6): return "local6";
- case LOG_FAC(LOG_LOCAL7): return "local7";
- default: return NULL;
- }
-}
-
-static const char *syslog_priority_to_name(int priority) {
- switch (priority) {
- case LOG_ALERT: return "alert";
- case LOG_CRIT: return "critical";
- case LOG_DEBUG: return "debug";
- case LOG_EMERG: return "panic";
- case LOG_ERR: return "error";
- case LOG_INFO: return "info";
- case LOG_NOTICE: return "notice";
- case LOG_WARNING: return "warning";
- default: return NULL;
- }
-}
-
-static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets __maybe_unused, FACET_ROW *row, void *data __maybe_unused) {
- // same to
- // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501
- // function get_log_colors()
-
- FACET_ROW_KEY_VALUE *priority_rkv = dictionary_get(row->dict, "PRIORITY");
- if(!priority_rkv || priority_rkv->empty)
- return FACET_ROW_SEVERITY_NORMAL;
-
- int priority = str2i(buffer_tostring(priority_rkv->wb));
-
- if(priority <= LOG_ERR)
- return FACET_ROW_SEVERITY_CRITICAL;
-
- else if (priority <= LOG_WARNING)
- return FACET_ROW_SEVERITY_WARNING;
-
- else if(priority <= LOG_NOTICE)
- return FACET_ROW_SEVERITY_NOTICE;
-
- else if(priority >= LOG_DEBUG)
- return FACET_ROW_SEVERITY_DEBUG;
-
- return FACET_ROW_SEVERITY_NORMAL;
-}
-
-static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
- static __thread char tmp[1024 + 1];
- struct passwd pw, *result = NULL;
-
- if (getpwuid_r(uid, &pw, tmp, sizeof(tmp), &result) != 0 || !result || !pw.pw_name || !(*pw.pw_name))
- snprintfz(buffer, buffer_size - 1, "%u", uid);
- else
- snprintfz(buffer, buffer_size - 1, "%u (%s)", uid, pw.pw_name);
-
- return buffer;
-}
-
-static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
- static __thread char tmp[1024];
- struct group grp, *result = NULL;
-
- if (getgrgid_r(gid, &grp, tmp, sizeof(tmp), &result) != 0 || !result || !grp.gr_name || !(*grp.gr_name))
- snprintfz(buffer, buffer_size - 1, "%u", gid);
- else
- snprintfz(buffer, buffer_size - 1, "%u (%s)", gid, grp.gr_name);
-
- return buffer;
-}
-
-static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- int facility = str2i(buffer_tostring(wb));
- const char *name = syslog_facility_to_name(facility);
- if (name) {
- buffer_flush(wb);
- buffer_strcat(wb, name);
- }
- }
-}
-
-static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- if(scope == FACETS_TRANSFORM_FACET_SORT)
- return;
-
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- int priority = str2i(buffer_tostring(wb));
- const char *name = syslog_priority_to_name(priority);
- if (name) {
- buffer_flush(wb);
- buffer_strcat(wb, name);
- }
- }
-}
-
-static void netdata_systemd_journal_transform_errno(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- if(scope == FACETS_TRANSFORM_FACET_SORT)
- return;
-
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- unsigned err_no = str2u(buffer_tostring(wb));
- if(err_no > 0 && err_no < sizeof(errno_map) / sizeof(*errno_map)) {
- const char *name = errno_map[err_no];
- if(name) {
- buffer_flush(wb);
- buffer_strcat(wb, name);
- }
- }
- }
-}
-
-// ----------------------------------------------------------------------------
-// UID and GID transformation
-
-#define UID_GID_HASHTABLE_SIZE 10000
-
-struct word_t2str_hashtable_entry {
- struct word_t2str_hashtable_entry *next;
- Word_t hash;
- size_t len;
- char str[];
-};
-
-struct word_t2str_hashtable {
- SPINLOCK spinlock;
- size_t size;
- struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE];
-};
-
-struct word_t2str_hashtable uid_hashtable = {
- .size = UID_GID_HASHTABLE_SIZE,
-};
-
-struct word_t2str_hashtable gid_hashtable = {
- .size = UID_GID_HASHTABLE_SIZE,
-};
-
-struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) {
- size_t slot = hash % ht->size;
- struct word_t2str_hashtable_entry **e = &ht->hashtable[slot];
-
- while(*e && (*e)->hash != hash)
- e = &((*e)->next);
-
- return e;
-}
-
-const char *uid_to_username_cached(uid_t uid, size_t *length) {
- spinlock_lock(&uid_hashtable.spinlock);
-
- struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid);
- if(!(*e)) {
- static __thread char buf[1024];
- const char *name = uid_to_username(uid, buf, sizeof(buf));
- size_t size = strlen(name) + 1;
-
- *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
- (*e)->len = size - 1;
- (*e)->hash = uid;
- memcpy((*e)->str, name, size);
- }
-
- spinlock_unlock(&uid_hashtable.spinlock);
-
- *length = (*e)->len;
- return (*e)->str;
-}
-
-const char *gid_to_groupname_cached(gid_t gid, size_t *length) {
- spinlock_lock(&gid_hashtable.spinlock);
-
- struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid);
- if(!(*e)) {
- static __thread char buf[1024];
- const char *name = gid_to_groupname(gid, buf, sizeof(buf));
- size_t size = strlen(name) + 1;
-
- *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
- (*e)->len = size - 1;
- (*e)->hash = gid;
- memcpy((*e)->str, name, size);
- }
-
- spinlock_unlock(&gid_hashtable.spinlock);
-
- *length = (*e)->len;
- return (*e)->str;
-}
-
-DICTIONARY *boot_ids_to_first_ut = NULL;
-
-static void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- const char *boot_id = buffer_tostring(wb);
- if(*boot_id && isxdigit(*boot_id)) {
- usec_t ut = UINT64_MAX;
- usec_t *p_ut = dictionary_get(boot_ids_to_first_ut, boot_id);
- if(!p_ut) {
- struct journal_file *jf;
- dfe_start_read(journal_files_registry, jf) {
- const char *files[2] = {
- [0] = jf_dfe.name,
- [1] = NULL,
- };
-
- sd_journal *j = NULL;
- if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j)
- continue;
-
- char m[100];
- size_t len = snprintfz(m, sizeof(m), "_BOOT_ID=%s", boot_id);
- usec_t t_ut = 0;
- if(sd_journal_add_match(j, m, len) < 0 ||
- sd_journal_seek_head(j) < 0 ||
- sd_journal_next(j) < 0 ||
- sd_journal_get_realtime_usec(j, &t_ut) < 0 || !t_ut) {
- sd_journal_close(j);
- continue;
- }
-
- if(t_ut < ut)
- ut = t_ut;
-
- sd_journal_close(j);
- }
- dfe_done(jf);
-
- dictionary_set(boot_ids_to_first_ut, boot_id, &ut, sizeof(ut));
- }
- else
- ut = *p_ut;
-
- if(ut != UINT64_MAX) {
- time_t timestamp_sec = (time_t)(ut / USEC_PER_SEC);
- struct tm tm;
- char buffer[30];
-
- gmtime_r(&timestamp_sec, &tm);
- strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm);
-
- switch(scope) {
- default:
- case FACETS_TRANSFORM_DATA:
- case FACETS_TRANSFORM_VALUE:
- buffer_sprintf(wb, " (%s UTC) ", buffer);
- break;
-
- case FACETS_TRANSFORM_FACET:
- case FACETS_TRANSFORM_FACET_SORT:
- case FACETS_TRANSFORM_HISTOGRAM:
- buffer_flush(wb);
- buffer_sprintf(wb, "%s UTC", buffer);
- break;
- }
- }
- }
-}
-
-static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- if(scope == FACETS_TRANSFORM_FACET_SORT)
- return;
-
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- uid_t uid = str2i(buffer_tostring(wb));
- size_t len;
- const char *name = uid_to_username_cached(uid, &len);
- buffer_contents_replace(wb, name, len);
- }
-}
-
-static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- if(scope == FACETS_TRANSFORM_FACET_SORT)
- return;
-
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- gid_t gid = str2i(buffer_tostring(wb));
- size_t len;
- const char *name = gid_to_groupname_cached(gid, &len);
- buffer_contents_replace(wb, name, len);
- }
-}
-
-const char *linux_capabilities[] = {
- [CAP_CHOWN] = "CHOWN",
- [CAP_DAC_OVERRIDE] = "DAC_OVERRIDE",
- [CAP_DAC_READ_SEARCH] = "DAC_READ_SEARCH",
- [CAP_FOWNER] = "FOWNER",
- [CAP_FSETID] = "FSETID",
- [CAP_KILL] = "KILL",
- [CAP_SETGID] = "SETGID",
- [CAP_SETUID] = "SETUID",
- [CAP_SETPCAP] = "SETPCAP",
- [CAP_LINUX_IMMUTABLE] = "LINUX_IMMUTABLE",
- [CAP_NET_BIND_SERVICE] = "NET_BIND_SERVICE",
- [CAP_NET_BROADCAST] = "NET_BROADCAST",
- [CAP_NET_ADMIN] = "NET_ADMIN",
- [CAP_NET_RAW] = "NET_RAW",
- [CAP_IPC_LOCK] = "IPC_LOCK",
- [CAP_IPC_OWNER] = "IPC_OWNER",
- [CAP_SYS_MODULE] = "SYS_MODULE",
- [CAP_SYS_RAWIO] = "SYS_RAWIO",
- [CAP_SYS_CHROOT] = "SYS_CHROOT",
- [CAP_SYS_PTRACE] = "SYS_PTRACE",
- [CAP_SYS_PACCT] = "SYS_PACCT",
- [CAP_SYS_ADMIN] = "SYS_ADMIN",
- [CAP_SYS_BOOT] = "SYS_BOOT",
- [CAP_SYS_NICE] = "SYS_NICE",
- [CAP_SYS_RESOURCE] = "SYS_RESOURCE",
- [CAP_SYS_TIME] = "SYS_TIME",
- [CAP_SYS_TTY_CONFIG] = "SYS_TTY_CONFIG",
- [CAP_MKNOD] = "MKNOD",
- [CAP_LEASE] = "LEASE",
- [CAP_AUDIT_WRITE] = "AUDIT_WRITE",
- [CAP_AUDIT_CONTROL] = "AUDIT_CONTROL",
- [CAP_SETFCAP] = "SETFCAP",
- [CAP_MAC_OVERRIDE] = "MAC_OVERRIDE",
- [CAP_MAC_ADMIN] = "MAC_ADMIN",
- [CAP_SYSLOG] = "SYSLOG",
- [CAP_WAKE_ALARM] = "WAKE_ALARM",
- [CAP_BLOCK_SUSPEND] = "BLOCK_SUSPEND",
- [37 /*CAP_AUDIT_READ*/] = "AUDIT_READ",
- [38 /*CAP_PERFMON*/] = "PERFMON",
- [39 /*CAP_BPF*/] = "BPF",
- [40 /* CAP_CHECKPOINT_RESTORE */] = "CHECKPOINT_RESTORE",
-};
-
-static void netdata_systemd_journal_transform_cap_effective(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- if(scope == FACETS_TRANSFORM_FACET_SORT)
- return;
-
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- uint64_t cap = strtoul(buffer_tostring(wb), NULL, 16);
- if(cap) {
- buffer_fast_strcat(wb, " (", 2);
- for (size_t i = 0, added = 0; i < sizeof(linux_capabilities) / sizeof(linux_capabilities[0]); i++) {
- if (linux_capabilities[i] && (cap & (1ULL << i))) {
-
- if (added)
- buffer_fast_strcat(wb, " | ", 3);
-
- buffer_strcat(wb, linux_capabilities[i]);
- added++;
- }
- }
- buffer_fast_strcat(wb, ")", 1);
- }
- }
-}
-
-static void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
- if(scope == FACETS_TRANSFORM_FACET_SORT)
- return;
-
- const char *v = buffer_tostring(wb);
- if(*v && isdigit(*v)) {
- uint64_t ut = str2ull(buffer_tostring(wb), NULL);
- if(ut) {
- time_t timestamp_sec = ut / USEC_PER_SEC;
- struct tm tm;
- char buffer[30];
-
- gmtime_r(&timestamp_sec, &tm);
- strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm);
- buffer_sprintf(wb, " (%s.%06llu UTC)", buffer, ut % USEC_PER_SEC);
- }
- }
-}
-
-// ----------------------------------------------------------------------------
-
-static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
- FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
- const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
-
- const char *identifier = NULL;
- FACET_ROW_KEY_VALUE *container_name_rkv = dictionary_get(row->dict, "CONTAINER_NAME");
- if(container_name_rkv && !container_name_rkv->empty)
- identifier = buffer_tostring(container_name_rkv->wb);
-
- if(!identifier) {
- FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER");
- if(syslog_identifier_rkv && !syslog_identifier_rkv->empty)
- identifier = buffer_tostring(syslog_identifier_rkv->wb);
-
- if(!identifier) {
- FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM");
- if(comm_rkv && !comm_rkv->empty)
- identifier = buffer_tostring(comm_rkv->wb);
- }
- }
-
- buffer_flush(rkv->wb);
-
- if(!identifier)
- buffer_strcat(rkv->wb, FACET_VALUE_UNSET);
- else
- buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid);
-
- buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
-}
-
-static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) {
- buffer_json_add_array_item_object(json_array);
- buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
- buffer_json_object_close(json_array);
-}
-
DICTIONARY *function_query_status_dict = NULL;
static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) {
@@ -2129,7 +1564,7 @@ static void function_systemd_journal_progress(BUFFER *wb, const char *transactio
buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut);
buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched);
char msg[1024 + 1];
- snprintfz(msg, 1024,
+ snprintfz(msg, sizeof(msg) - 1,
"Read %zu rows (%0.0f rows/s), "
"data %0.1f MB (%0.1f MB/s), "
"file %zu of %zu",
@@ -2147,10 +1582,9 @@ static void function_systemd_journal_progress(BUFFER *wb, const char *transactio
dictionary_acquired_item_release(function_query_status_dict, item);
}
-static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
+void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
fstat_thread_calls = 0;
fstat_thread_cached_responses = 0;
- journal_files_registry_update();
BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
@@ -2186,6 +1620,7 @@ static void function_systemd_journal(const char *transaction, char *function, in
facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS);
facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA);
facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_SAMPLING);
#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE);
@@ -2196,10 +1631,10 @@ static void function_systemd_journal(const char *transaction, char *function, in
facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL);
facets_register_key_name(facets, "_HOSTNAME",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE);
facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS,
- FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
+ FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE,
netdata_systemd_journal_dynamic_row_id, NULL);
facets_register_key_name(facets, "MESSAGE",
@@ -2212,71 +1647,78 @@ static void function_systemd_journal(const char *transaction, char *function, in
// netdata_systemd_journal_rich_message, NULL);
facets_register_key_name_transformation(facets, "PRIORITY",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW |
+ FACET_KEY_OPTION_EXPANDED_FILTER,
netdata_systemd_journal_transform_priority, NULL);
facets_register_key_name_transformation(facets, "SYSLOG_FACILITY",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW |
+ FACET_KEY_OPTION_EXPANDED_FILTER,
netdata_systemd_journal_transform_syslog_facility, NULL);
facets_register_key_name_transformation(facets, "ERRNO",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_errno, NULL);
facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE,
FACET_KEY_OPTION_NEVER_FACET);
facets_register_key_name(facets, "SYSLOG_IDENTIFIER",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
+ FACET_KEY_OPTION_FACET);
facets_register_key_name(facets, "UNIT",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
+ FACET_KEY_OPTION_FACET);
facets_register_key_name(facets, "USER_UNIT",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
+ FACET_KEY_OPTION_FACET);
+
+ facets_register_key_name_transformation(facets, "MESSAGE_ID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW |
+ FACET_KEY_OPTION_EXPANDED_FILTER,
+ netdata_systemd_journal_transform_message_id, NULL);
facets_register_key_name_transformation(facets, "_BOOT_ID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_boot_id, NULL);
facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "_UID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "OBJECT_UID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "_GID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_gid, NULL);
facets_register_key_name_transformation(facets, "OBJECT_GID",
- FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_gid, NULL);
facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE",
- FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_cap_effective, NULL);
facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID",
- FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID",
- FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_uid, NULL);
facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP",
- FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ FACET_KEY_OPTION_TRANSFORM_VIEW,
netdata_systemd_journal_transform_timestamp_usec, NULL);
// ------------------------------------------------------------------------
@@ -2290,10 +1732,11 @@ static void function_systemd_journal(const char *transaction, char *function, in
FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION;
const char *query = NULL;
const char *chart = NULL;
- const char *source = NULL;
+ SIMPLE_PATTERN *sources = NULL;
const char *progress_id = NULL;
SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL;
size_t filters = 0;
+ size_t sampling = SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING;
buffer_json_member_add_object(wb, "_request");
@@ -2329,6 +1772,9 @@ static void function_systemd_journal(const char *transaction, char *function, in
else
tail = true;
}
+ else if(strncmp(keyword, JOURNAL_PARAMETER_SAMPLING ":", sizeof(JOURNAL_PARAMETER_SAMPLING ":") - 1) == 0) {
+ sampling = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_SAMPLING ":") - 1]);
+ }
else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) {
char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1];
@@ -2352,40 +1798,67 @@ static void function_systemd_journal(const char *transaction, char *function, in
progress_id = id;
}
else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
- source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
+ const char *value = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
- if(strcmp(source, SDJF_SOURCE_ALL_NAME) == 0) {
- source_type = SDJF_ALL;
- source = NULL;
- }
- else if(strcmp(source, SDJF_SOURCE_LOCAL_NAME) == 0) {
- source_type = SDJF_LOCAL;
- source = NULL;
- }
- else if(strcmp(source, SDJF_SOURCE_REMOTES_NAME) == 0) {
- source_type = SDJF_REMOTE;
- source = NULL;
- }
- else if(strcmp(source, SDJF_SOURCE_NAMESPACES_NAME) == 0) {
- source_type = SDJF_NAMESPACE;
- source = NULL;
- }
- else if(strcmp(source, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) {
- source_type = SDJF_LOCAL | SDJF_SYSTEM;
- source = NULL;
- }
- else if(strcmp(source, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) {
- source_type = SDJF_LOCAL | SDJF_USER;
- source = NULL;
- }
- else if(strcmp(source, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) {
- source_type = SDJF_LOCAL | SDJF_OTHER;
- source = NULL;
+ buffer_json_member_add_array(wb, JOURNAL_PARAMETER_SOURCE);
+
+ BUFFER *sources_list = buffer_create(0, NULL);
+
+ source_type = SDJF_NONE;
+ while(value) {
+ char *sep = strchr(value, ',');
+ if(sep)
+ *sep++ = '\0';
+
+ buffer_json_add_array_item_string(wb, value);
+
+ if(strcmp(value, SDJF_SOURCE_ALL_NAME) == 0) {
+ source_type |= SDJF_ALL;
+ value = NULL;
+ }
+ else if(strcmp(value, SDJF_SOURCE_LOCAL_NAME) == 0) {
+ source_type |= SDJF_LOCAL_ALL;
+ value = NULL;
+ }
+ else if(strcmp(value, SDJF_SOURCE_REMOTES_NAME) == 0) {
+ source_type |= SDJF_REMOTE_ALL;
+ value = NULL;
+ }
+ else if(strcmp(value, SDJF_SOURCE_NAMESPACES_NAME) == 0) {
+ source_type |= SDJF_LOCAL_NAMESPACE;
+ value = NULL;
+ }
+ else if(strcmp(value, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) {
+ source_type |= SDJF_LOCAL_SYSTEM;
+ value = NULL;
+ }
+ else if(strcmp(value, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) {
+ source_type |= SDJF_LOCAL_USER;
+ value = NULL;
+ }
+ else if(strcmp(value, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) {
+ source_type |= SDJF_LOCAL_OTHER;
+ value = NULL;
+ }
+ else {
+ // else, match the source, whatever it is
+ if(buffer_strlen(sources_list))
+ buffer_strcat(sources_list, ",");
+
+ buffer_strcat(sources_list, value);
+ }
+
+ value = sep;
}
- else {
- source_type = SDJF_ALL;
- // else, match the source, whatever it is
+
+ if(buffer_strlen(sources_list)) {
+ simple_pattern_free(sources);
+ sources = simple_pattern_create(buffer_tostring(sources_list), ",", SIMPLE_PATTERN_EXACT, false);
}
+
+ buffer_free(sources_list);
+
+ buffer_json_array_close(wb); // source
}
else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) {
after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]);
@@ -2502,7 +1975,7 @@ static void function_systemd_journal(const char *transaction, char *function, in
fqs->data_only = data_only;
fqs->delta = (fqs->data_only) ? delta : false;
fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false;
- fqs->source = string_strdupz(source);
+ fqs->sources = sources;
fqs->source_type = source_type;
fqs->entries = last;
fqs->last_modified = 0;
@@ -2512,6 +1985,7 @@ static void function_systemd_journal(const char *transaction, char *function, in
fqs->direction = direction;
fqs->anchor.start_ut = anchor;
fqs->anchor.stop_ut = 0;
+ fqs->sampling = sampling;
if(fqs->anchor.start_ut && fqs->tail) {
// a tail request
@@ -2574,8 +2048,8 @@ static void function_systemd_journal(const char *transaction, char *function, in
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false);
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta);
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail);
+ buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_SAMPLING, fqs->sampling);
buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id);
- buffer_json_member_add_string(wb, JOURNAL_PARAMETER_SOURCE, string2str(fqs->source));
buffer_json_member_add_uint64(wb, "source_type", fqs->source_type);
buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC);
buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC);
@@ -2603,7 +2077,7 @@ static void function_systemd_journal(const char *transaction, char *function, in
buffer_json_member_add_string(wb, "id", "source");
buffer_json_member_add_string(wb, "name", "source");
buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query");
- buffer_json_member_add_string(wb, "type", "select");
+ buffer_json_member_add_string(wb, "type", "multiselect");
buffer_json_member_add_array(wb, "options");
{
available_journal_file_sources_to_json_array(wb);
@@ -2632,12 +2106,6 @@ static void function_systemd_journal(const char *transaction, char *function, in
response = netdata_systemd_journal_query(wb, facets, fqs);
// ------------------------------------------------------------------------
- // cleanup query params
-
- string_freez(fqs->source);
- fqs->source = NULL;
-
- // ------------------------------------------------------------------------
// handle error response
if(response != HTTP_RESP_OK) {
@@ -2653,6 +2121,7 @@ output:
netdata_mutex_unlock(&stdout_mutex);
cleanup:
+ simple_pattern_free(sources);
facets_destroy(facets);
buffer_free(wb);
@@ -2663,129 +2132,8 @@ cleanup:
}
}
-// ----------------------------------------------------------------------------
-
-int main(int argc __maybe_unused, char **argv __maybe_unused) {
- stderror = stderr;
- clocks_init();
-
- program_name = "systemd-journal.plugin";
-
- // disable syslog
- error_log_syslog = 0;
-
- // set errors flood protection to 100 logs per hour
- error_log_errors_per_period = 100;
- error_log_throttle_period = 3600;
-
- log_set_global_severity_for_external_plugins();
-
- netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
- if(verify_netdata_host_prefix() == -1) exit(1);
-
- // ------------------------------------------------------------------------
- // setup the journal directories
-
- unsigned d = 0;
-
- journal_directories[d++].path = strdupz("/var/log/journal");
- journal_directories[d++].path = strdupz("/run/log/journal");
-
- if(*netdata_configured_host_prefix) {
- char path[PATH_MAX];
- snprintfz(path, sizeof(path), "%s/var/log/journal", netdata_configured_host_prefix);
- journal_directories[d++].path = strdupz(path);
- snprintfz(path, sizeof(path), "%s/run/log/journal", netdata_configured_host_prefix);
- journal_directories[d++].path = strdupz(path);
- }
-
- // terminate the list
- journal_directories[d].path = NULL;
-
- // ------------------------------------------------------------------------
-
+void journal_init_query_status(void) {
function_query_status_dict = dictionary_create_advanced(
DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
NULL, sizeof(FUNCTION_QUERY_STATUS));
-
- // ------------------------------------------------------------------------
- // initialize the used hashes files registry
-
- used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
-
-
- // ------------------------------------------------------------------------
- // initialize the journal files registry
-
- systemd_journal_session = (now_realtime_usec() / USEC_PER_SEC) * USEC_PER_SEC;
-
- journal_files_registry = dictionary_create_advanced(
- DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
- NULL, sizeof(struct journal_file));
-
- dictionary_register_insert_callback(journal_files_registry, files_registry_insert_cb, NULL);
- dictionary_register_delete_callback(journal_files_registry, files_registry_delete_cb, NULL);
- dictionary_register_conflict_callback(journal_files_registry, files_registry_conflict_cb, NULL);
-
- boot_ids_to_first_ut = dictionary_create_advanced(
- DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
- NULL, sizeof(usec_t));
-
- journal_files_registry_update();
-
-
- // ------------------------------------------------------------------------
- // debug
-
- if(argc == 2 && strcmp(argv[1], "debug") == 0) {
- bool cancelled = false;
- char buf[] = "systemd-journal after:-16000000 before:0 last:1";
- // char buf[] = "systemd-journal after:1695332964 before:1695937764 direction:backward last:100 slice:true source:all DHKucpqUoe1:PtVoyIuX.MU";
- // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403";
- function_systemd_journal("123", buf, 600, &cancelled);
- exit(1);
- }
-
- // ------------------------------------------------------------------------
- // the event loop for functions
-
- struct functions_evloop_globals *wg =
- functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit);
-
- functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal,
- SYSTEMD_JOURNAL_DEFAULT_TIMEOUT);
-
-
- // ------------------------------------------------------------------------
-
- time_t started_t = now_monotonic_sec();
-
- size_t iteration = 0;
- usec_t step = 1000 * USEC_PER_MS;
- bool tty = isatty(fileno(stderr)) == 1;
-
- netdata_mutex_lock(&stdout_mutex);
- fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n",
- SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
-
- heartbeat_t hb;
- heartbeat_init(&hb);
- while(!plugin_should_exit) {
- iteration++;
-
- netdata_mutex_unlock(&stdout_mutex);
- heartbeat_next(&hb, step);
- netdata_mutex_lock(&stdout_mutex);
-
- if(!tty)
- fprintf(stdout, "\n");
-
- fflush(stdout);
-
- time_t now = now_monotonic_sec();
- if(now - started_t > 86400)
- break;
- }
-
- exit(0);
}