summaryrefslogtreecommitdiffstats
path: root/collectors/systemd-journal.plugin/systemd-journal.c
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/systemd-journal.plugin/systemd-journal.c')
-rw-r--r--collectors/systemd-journal.plugin/systemd-journal.c2664
1 files changed, 2427 insertions, 237 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c
index 304ff244a..c2bd98e7d 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/collectors/systemd-journal.plugin/systemd-journal.c
@@ -5,29 +5,110 @@
* GPL v3+
*/
-// TODO - 1) MARKDOC
-
#include "collectors/all.h"
#include "libnetdata/libnetdata.h"
#include "libnetdata/required_dummies.h"
-#ifndef SD_JOURNAL_ALL_NAMESPACES
-#define JOURNAL_NAMESPACE SD_JOURNAL_LOCAL_ONLY
-#else
-#define JOURNAL_NAMESPACE SD_JOURNAL_ALL_NAMESPACES
-#endif
-
+#include <linux/capability.h>
#include <systemd/sd-journal.h>
#include <syslog.h>
+/*
+ * TODO
+ *
+ * _UDEV_DEVLINK is frequently set more than once per field - support multi-value faces
+ *
+ */
+
+
+// ----------------------------------------------------------------------------
+// fstat64 overloading to speed up libsystemd
+// https://github.com/systemd/systemd/pull/29261
+
+#define ND_SD_JOURNAL_OPEN_FLAGS (0)
+
+#include <dlfcn.h>
+#include <sys/stat.h>
+
+#define FSTAT_CACHE_MAX 1024
+struct fdstat64_cache_entry {
+ bool enabled;
+ bool updated;
+ int err_no;
+ struct stat64 stat;
+ int ret;
+ size_t cached_count;
+ size_t session;
+};
+
+struct fdstat64_cache_entry fstat64_cache[FSTAT_CACHE_MAX] = {0 };
+static __thread size_t fstat_thread_calls = 0;
+static __thread size_t fstat_thread_cached_responses = 0;
+static __thread bool enable_thread_fstat = false;
+static __thread size_t fstat_caching_thread_session = 0;
+static size_t fstat_caching_global_session = 0;
+
+static void fstat_cache_enable_on_thread(void) {
+ fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_ACQUIRE);
+ enable_thread_fstat = true;
+}
+
+static void fstat_cache_disable_on_thread(void) {
+ fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_RELEASE);
+ enable_thread_fstat = false;
+}
+
+int fstat64(int fd, struct stat64 *buf) {
+ static int (*real_fstat)(int, struct stat64 *) = NULL;
+ if (!real_fstat)
+ real_fstat = dlsym(RTLD_NEXT, "fstat64");
+
+ fstat_thread_calls++;
+
+ if(fd >= 0 && fd < FSTAT_CACHE_MAX) {
+ if(enable_thread_fstat && fstat64_cache[fd].session != fstat_caching_thread_session) {
+ fstat64_cache[fd].session = fstat_caching_thread_session;
+ fstat64_cache[fd].enabled = true;
+ fstat64_cache[fd].updated = false;
+ }
+
+ if(fstat64_cache[fd].enabled && fstat64_cache[fd].updated && fstat64_cache[fd].session == fstat_caching_thread_session) {
+ fstat_thread_cached_responses++;
+ errno = fstat64_cache[fd].err_no;
+ *buf = fstat64_cache[fd].stat;
+ fstat64_cache[fd].cached_count++;
+ return fstat64_cache[fd].ret;
+ }
+ }
+
+ int ret = real_fstat(fd, buf);
+
+ if(fd >= 0 && fd < FSTAT_CACHE_MAX && fstat64_cache[fd].enabled) {
+ fstat64_cache[fd].ret = ret;
+ fstat64_cache[fd].updated = true;
+ fstat64_cache[fd].err_no = errno;
+ fstat64_cache[fd].stat = *buf;
+ fstat64_cache[fd].session = fstat_caching_thread_session;
+ }
+
+ return ret;
+}
+
+// ----------------------------------------------------------------------------
+
#define FACET_MAX_VALUE_LENGTH 8192
+#define SYSTEMD_JOURNAL_MAX_SOURCE_LEN 64
#define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries."
#define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal"
-#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 30
+#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60
#define SYSTEMD_JOURNAL_MAX_PARAMS 100
-#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600)
+#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600)
#define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200
+#define SYSTEMD_JOURNAL_WORKER_THREADS 5
+
+#define JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT (5 * USEC_PER_SEC) // assume always 5 seconds latency
+#define JOURNAL_VS_REALTIME_DELTA_MAX_UT (2 * 60 * USEC_PER_SEC) // up to 2 minutes latency
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
@@ -35,147 +116,1540 @@
#define JOURNAL_PARAMETER_ANCHOR "anchor"
#define JOURNAL_PARAMETER_LAST "last"
#define JOURNAL_PARAMETER_QUERY "query"
+#define JOURNAL_PARAMETER_FACETS "facets"
+#define JOURNAL_PARAMETER_HISTOGRAM "histogram"
+#define JOURNAL_PARAMETER_DIRECTION "direction"
+#define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since"
+#define JOURNAL_PARAMETER_DATA_ONLY "data_only"
+#define JOURNAL_PARAMETER_SOURCE "source"
+#define JOURNAL_PARAMETER_INFO "info"
+#define JOURNAL_PARAMETER_ID "id"
+#define JOURNAL_PARAMETER_PROGRESS "progress"
+#define JOURNAL_PARAMETER_SLICE "slice"
+#define JOURNAL_PARAMETER_DELTA "delta"
+#define JOURNAL_PARAMETER_TAIL "tail"
+
+#define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE"
+#define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS"
+
+#define JOURNAL_DEFAULT_SLICE_MODE true
+#define JOURNAL_DEFAULT_DIRECTION FACETS_ANCHOR_DIRECTION_BACKWARD
#define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL
-#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS NULL
+
+#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \
+ "*MESSAGE*" \
+ "|*_RAW" \
+ "|*_USEC" \
+ "|*_NSEC" \
+ "|*TIMESTAMP*" \
+ "|*_ID" \
+ "|*_ID_*" \
+ "|__*" \
+ ""
+
#define SYSTEMD_KEYS_INCLUDED_IN_FACETS \
- "_TRANSPORT" \
- "|SYSLOG_IDENTIFIER" \
- "|SYSLOG_FACILITY" \
+ \
+ /* --- USER JOURNAL FIELDS --- */ \
+ \
+ /* "|MESSAGE" */ \
+ /* "|MESSAGE_ID" */ \
"|PRIORITY" \
- "|_HOSTNAME" \
- "|_RUNTIME_SCOPE" \
- "|_PID" \
+ "|CODE_FILE" \
+ /* "|CODE_LINE" */ \
+ "|CODE_FUNC" \
+ "|ERRNO" \
+ /* "|INVOCATION_ID" */ \
+ /* "|USER_INVOCATION_ID" */ \
+ "|SYSLOG_FACILITY" \
+ "|SYSLOG_IDENTIFIER" \
+ /* "|SYSLOG_PID" */ \
+ /* "|SYSLOG_TIMESTAMP" */ \
+ /* "|SYSLOG_RAW" */ \
+ /* "!DOCUMENTATION" */ \
+ /* "|TID" */ \
+ "|UNIT" \
+ "|USER_UNIT" \
+ "|UNIT_RESULT" /* undocumented */ \
+ \
+ \
+ /* --- TRUSTED JOURNAL FIELDS --- */ \
+ \
+ /* "|_PID" */ \
"|_UID" \
"|_GID" \
- "|_SYSTEMD_UNIT" \
- "|_SYSTEMD_SLICE" \
- "|_SYSTEMD_USER_SLICE" \
"|_COMM" \
"|_EXE" \
+ /* "|_CMDLINE" */ \
+ "|_CAP_EFFECTIVE" \
+ /* "|_AUDIT_SESSION" */ \
+ "|_AUDIT_LOGINUID" \
"|_SYSTEMD_CGROUP" \
+ "|_SYSTEMD_SLICE" \
+ "|_SYSTEMD_UNIT" \
"|_SYSTEMD_USER_UNIT" \
- "|USER_UNIT" \
- "|UNIT" \
+ "|_SYSTEMD_USER_SLICE" \
+ "|_SYSTEMD_SESSION" \
+ "|_SYSTEMD_OWNER_UID" \
+ "|_SELINUX_CONTEXT" \
+ /* "|_SOURCE_REALTIME_TIMESTAMP" */ \
+ "|_BOOT_ID" \
+ "|_MACHINE_ID" \
+ /* "|_SYSTEMD_INVOCATION_ID" */ \
+ "|_HOSTNAME" \
+ "|_TRANSPORT" \
+ "|_STREAM_ID" \
+ /* "|LINE_BREAK" */ \
+ "|_NAMESPACE" \
+ "|_RUNTIME_SCOPE" \
+ \
+ \
+ /* --- KERNEL JOURNAL FIELDS --- */ \
+ \
+ /* "|_KERNEL_DEVICE" */ \
+ "|_KERNEL_SUBSYSTEM" \
+ /* "|_UDEV_SYSNAME" */ \
+ "|_UDEV_DEVNODE" \
+ /* "|_UDEV_DEVLINK" */ \
+ \
+ \
+ /* --- LOGGING ON BEHALF --- */ \
+ \
+ "|OBJECT_UID" \
+ "|OBJECT_GID" \
+ "|OBJECT_COMM" \
+ "|OBJECT_EXE" \
+ /* "|OBJECT_CMDLINE" */ \
+ /* "|OBJECT_AUDIT_SESSION" */ \
+ "|OBJECT_AUDIT_LOGINUID" \
+ "|OBJECT_SYSTEMD_CGROUP" \
+ "|OBJECT_SYSTEMD_SESSION" \
+ "|OBJECT_SYSTEMD_OWNER_UID" \
+ "|OBJECT_SYSTEMD_UNIT" \
+ "|OBJECT_SYSTEMD_USER_UNIT" \
+ \
+ \
+ /* --- CORE DUMPS --- */ \
+ \
+ "|COREDUMP_COMM" \
+ "|COREDUMP_UNIT" \
+ "|COREDUMP_USER_UNIT" \
+ "|COREDUMP_SIGNAL_NAME" \
+ "|COREDUMP_CGROUP" \
+ \
+ \
+ /* --- DOCKER --- */ \
+ \
+ "|CONTAINER_ID" \
+ /* "|CONTAINER_ID_FULL" */ \
+ "|CONTAINER_NAME" \
+ "|CONTAINER_TAG" \
+ "|IMAGE_NAME" /* undocumented */ \
+ /* "|CONTAINER_PARTIAL_MESSAGE" */ \
+ \
""
-static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
+static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER;
static bool plugin_should_exit = false;
-DICTIONARY *uids = NULL;
-DICTIONARY *gids = NULL;
+// ----------------------------------------------------------------------------
+typedef enum {
+ ND_SD_JOURNAL_NO_FILE_MATCHED,
+ ND_SD_JOURNAL_FAILED_TO_OPEN,
+ ND_SD_JOURNAL_FAILED_TO_SEEK,
+ ND_SD_JOURNAL_TIMED_OUT,
+ ND_SD_JOURNAL_OK,
+ ND_SD_JOURNAL_NOT_MODIFIED,
+ ND_SD_JOURNAL_CANCELLED,
+} ND_SD_JOURNAL_STATUS;
+
+typedef enum {
+ SDJF_ALL = 0,
+ SDJF_LOCAL = (1 << 0),
+ SDJF_REMOTE = (1 << 1),
+ SDJF_SYSTEM = (1 << 2),
+ SDJF_USER = (1 << 3),
+ SDJF_NAMESPACE = (1 << 4),
+ SDJF_OTHER = (1 << 5),
+} SD_JOURNAL_FILE_SOURCE_TYPE;
+
+typedef struct function_query_status {
+ bool *cancelled; // a pointer to the cancelling boolean
+ usec_t stop_monotonic_ut;
+
+ usec_t started_monotonic_ut;
+
+ // request
+ SD_JOURNAL_FILE_SOURCE_TYPE source_type;
+ STRING *source;
+ usec_t after_ut;
+ usec_t before_ut;
+
+ struct {
+ usec_t start_ut;
+ usec_t stop_ut;
+ } anchor;
+
+ FACETS_ANCHOR_DIRECTION direction;
+ size_t entries;
+ usec_t if_modified_since;
+ bool delta;
+ bool tail;
+ bool data_only;
+ bool slice;
+ size_t filters;
+ usec_t last_modified;
+ const char *query;
+ const char *histogram;
+
+ // per file progress info
+ size_t cached_count;
+
+ // progress statistics
+ usec_t matches_setup_ut;
+ size_t rows_useful;
+ size_t rows_read;
+ size_t bytes_read;
+ size_t files_matched;
+ size_t file_working;
+} FUNCTION_QUERY_STATUS;
+
+struct journal_file {
+ const char *filename;
+ size_t filename_len;
+ STRING *source;
+ SD_JOURNAL_FILE_SOURCE_TYPE source_type;
+ usec_t file_last_modified_ut;
+ usec_t msg_first_ut;
+ usec_t msg_last_ut;
+ usec_t last_scan_ut;
+ size_t size;
+ bool logged_failure;
+ usec_t max_journal_vs_realtime_delta_ut;
+};
+
+static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) {
+ netdata_log_error("ERROR: %s, on query "
+ "timeframe [%"PRIu64" - %"PRIu64"], "
+ "anchor [%"PRIu64" - %"PRIu64"], "
+ "if_modified_since %"PRIu64", "
+ "data_only:%s, delta:%s, tail:%s, direction:%s"
+ , msg
+ , fqs->after_ut, fqs->before_ut
+ , fqs->anchor.start_ut, fqs->anchor.stop_ut
+ , fqs->if_modified_since
+ , fqs->data_only ? "true" : "false"
+ , fqs->delta ? "true" : "false"
+ , fqs->tail ? "tail" : "false"
+ , fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
+}
-// ----------------------------------------------------------------------------
+static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) {
+ if(sd_journal_seek_realtime_usec(j, timestamp) < 0) {
+ netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp);
+ if(sd_journal_seek_tail(j) < 0) {
+ netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail");
+ return false;
+ }
+ }
+
+ return true;
+}
+
+#define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP"
-int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t stop_monotonic_ut) {
- sd_journal *j;
- int r;
+static inline bool parse_journal_field(const char *data, size_t data_length, const char **key, size_t *key_length, const char **value, size_t *value_length) {
+ const char *k = data;
+ const char *equal = strchr(k, '=');
+ if(unlikely(!equal))
+ return false;
- // Open the system journal for reading
- r = sd_journal_open(&j, JOURNAL_NAMESPACE);
- if (r < 0)
- return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ size_t kl = equal - k;
+
+ const char *v = ++equal;
+ size_t vl = data_length - kl - 1;
+
+ *key = k;
+ *key_length = kl;
+ *value = v;
+ *value_length = vl;
+
+ return true;
+}
+
+static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) {
+ const void *data;
+ size_t length, bytes = 0;
+
+ facets_add_key_value_length(facets, JOURNAL_KEY_ND_JOURNAL_FILE, sizeof(JOURNAL_KEY_ND_JOURNAL_FILE) - 1, jf->filename, jf->filename_len);
+
+ SD_JOURNAL_FOREACH_DATA(j, data, length) {
+ const char *key, *value;
+ size_t key_length, value_length;
+
+ if(!parse_journal_field(data, length, &key, &key_length, &value, &value_length))
+ continue;
+
+#ifdef NETDATA_INTERNAL_CHECKS
+ usec_t origin_journal_ut = *msg_ut;
+#endif
+ if(unlikely(key_length == sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1 &&
+ memcmp(key, JD_SOURCE_REALTIME_TIMESTAMP, sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1) == 0)) {
+ usec_t ut = str2ull(value, NULL);
+ if(ut && ut < *msg_ut) {
+ usec_t delta = *msg_ut - ut;
+ *msg_ut = ut;
+
+ if(delta > JOURNAL_VS_REALTIME_DELTA_MAX_UT)
+ delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
+
+ // update max_journal_vs_realtime_delta_ut if the delta increased
+ usec_t expected = jf->max_journal_vs_realtime_delta_ut;
+ do {
+ if(delta <= expected)
+ break;
+ } while(!__atomic_compare_exchange_n(&jf->max_journal_vs_realtime_delta_ut, &expected, delta, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ internal_error(delta > expected,
+ "increased max_journal_vs_realtime_delta_ut from %"PRIu64" to %"PRIu64", "
+ "journal %"PRIu64", actual %"PRIu64" (delta %"PRIu64")"
+ , expected, delta, origin_journal_ut, *msg_ut, origin_journal_ut - (*msg_ut));
+ }
+ }
+
+ bytes += length;
+ facets_add_key_value_length(facets, key, key_length, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
+ }
+
+ return bytes;
+}
+
+#define FUNCTION_PROGRESS_UPDATE_ROWS(rows_read, rows) __atomic_fetch_add(&(rows_read), rows, __ATOMIC_RELAXED)
+#define FUNCTION_PROGRESS_UPDATE_BYTES(bytes_read, bytes) __atomic_fetch_add(&(bytes_read), bytes, __ATOMIC_RELAXED)
+#define FUNCTION_PROGRESS_EVERY_ROWS (1ULL << 13)
+#define FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS (1ULL << 7)
+
+static inline ND_SD_JOURNAL_STATUS check_stop(const bool *cancelled, const usec_t *stop_monotonic_ut) {
+ if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) {
+ internal_error(true, "Function has been cancelled");
+ return ND_SD_JOURNAL_CANCELLED;
+ }
+
+ if(now_monotonic_usec() > __atomic_load_n(stop_monotonic_ut, __ATOMIC_RELAXED)) {
+ internal_error(true, "Function timed out");
+ return ND_SD_JOURNAL_TIMED_OUT;
+ }
+
+ return ND_SD_JOURNAL_OK;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
+ sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
+ struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
+
+ usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED);
+
+ usec_t start_ut = ((fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->before_ut) + anchor_delta;
+ usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut;
+ bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
+
+ if(!netdata_systemd_journal_seek_to(j, start_ut))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
+
+ size_t errors_no_timestamp = 0;
+ usec_t earliest_msg_ut = 0;
+ size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
+ size_t bytes = 0, last_bytes = 0;
+
+ usec_t last_usec_from = 0;
+ usec_t last_usec_to = 0;
+
+ ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
facets_rows_begin(facets);
+ while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) {
+ usec_t msg_ut = 0;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) {
+ errors_no_timestamp++;
+ continue;
+ }
+
+ if(unlikely(msg_ut > earliest_msg_ut))
+ earliest_msg_ut = msg_ut;
- bool timed_out = false;
- size_t row_counter = 0;
- sd_journal_seek_realtime_usec(j, before_ut);
- SD_JOURNAL_FOREACH_BACKWARDS(j) {
- row_counter++;
+ if (unlikely(msg_ut > start_ut))
+ continue;
+
+ if (unlikely(msg_ut < stop_ut))
+ break;
- uint64_t msg_ut;
- sd_journal_get_realtime_usec(j, &msg_ut);
- if (msg_ut < after_ut)
+ bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
+
+ // make sure each line gets a unique timestamp
+ if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
+ msg_ut = --last_usec_from;
+ else
+ last_usec_from = last_usec_to = msg_ut;
+
+ if(facets_row_finished(facets, msg_ut))
+ rows_useful++;
+
+ row_counter++;
+ if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
+ stop_when_full &&
+ facets_rows(facets) >= fqs->entries)) {
+ // stop the data only query
+ usec_t oldest = facets_row_oldest_ut(facets);
+ if(oldest && msg_ut < (oldest - anchor_delta))
break;
+ }
- const void *data;
- size_t length;
- SD_JOURNAL_FOREACH_DATA(j, data, length) {
- const char *key = data;
- const char *equal = strchr(key, '=');
- if(unlikely(!equal))
- continue;
+ if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
+ FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
+ last_row_counter = row_counter;
- const char *value = ++equal;
- size_t key_length = value - key; // including '\0'
+ FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
+ last_bytes = bytes;
- char key_copy[key_length];
- memcpy(key_copy, key, key_length - 1);
- key_copy[key_length - 1] = '\0';
+ status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ }
+ }
- size_t value_length = length - key_length; // without '\0'
- facets_add_key_value_length(facets, key_copy, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH);
- }
+ FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
+ FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
+
+ fqs->rows_useful += rows_useful;
+
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ if(earliest_msg_ut > fqs->last_modified)
+ fqs->last_modified = earliest_msg_ut;
+
+ return status;
+}
+
+ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
+ sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets,
+ struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
+
+ usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED);
+
+ usec_t start_ut = (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->after_ut;
+ usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta;
+ bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut);
+
+ if(!netdata_systemd_journal_seek_to(j, start_ut))
+ return ND_SD_JOURNAL_FAILED_TO_SEEK;
+
+ size_t errors_no_timestamp = 0;
+ usec_t earliest_msg_ut = 0;
+ size_t row_counter = 0, last_row_counter = 0, rows_useful = 0;
+ size_t bytes = 0, last_bytes = 0;
+
+ usec_t last_usec_from = 0;
+ usec_t last_usec_to = 0;
+
+ ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK;
+
+ facets_rows_begin(facets);
+ while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) {
+ usec_t msg_ut = 0;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) {
+ errors_no_timestamp++;
+ continue;
+ }
+
+ if(likely(msg_ut > earliest_msg_ut))
+ earliest_msg_ut = msg_ut;
- facets_row_finished(facets, msg_ut);
+ if (unlikely(msg_ut < start_ut))
+ continue;
- if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) {
- timed_out = true;
+ if (unlikely(msg_ut > stop_ut))
+ break;
+
+ bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut);
+
+ // make sure each line gets a unique timestamp
+ if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to))
+ msg_ut = ++last_usec_to;
+ else
+ last_usec_from = last_usec_to = msg_ut;
+
+ if(facets_row_finished(facets, msg_ut))
+ rows_useful++;
+
+ row_counter++;
+ if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 &&
+ stop_when_full &&
+ facets_rows(facets) >= fqs->entries)) {
+ // stop the data only query
+ usec_t newest = facets_row_newest_ut(facets);
+ if(newest && msg_ut > (newest + anchor_delta))
break;
+ }
+
+ if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) {
+ FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
+ last_row_counter = row_counter;
+
+ FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
+ last_bytes = bytes;
+
+ status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ }
+ }
+
+ FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter);
+ FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
+
+ fqs->rows_useful += rows_useful;
+
+ if(errors_no_timestamp)
+ netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp);
+
+ if(earliest_msg_ut > fqs->last_modified)
+ fqs->last_modified = earliest_msg_ut;
+
+ return status;
+}
+
+bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) {
+ // return true, if data have been modified since the timestamp
+
+ if(!last_modified || !seek_to)
+ return false;
+
+ if(!netdata_systemd_journal_seek_to(j, seek_to))
+ return false;
+
+ usec_t first_msg_ut = 0;
+ while (sd_journal_previous(j) > 0) {
+ usec_t msg_ut;
+ if(sd_journal_get_realtime_usec(j, &msg_ut) < 0)
+ continue;
+
+ first_msg_ut = msg_ut;
+ break;
+ }
+
+ return first_msg_ut != last_modified;
+}
+
+#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
+static bool netdata_systemd_filtering_by_journal(sd_journal *j, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
+ const char *field = NULL;
+ const void *data = NULL;
+ size_t data_length;
+ size_t added_keys = 0;
+ size_t failures = 0;
+ size_t filters_added = 0;
+
+ SD_JOURNAL_FOREACH_FIELD(j, field) {
+ bool interesting;
+
+ if(fqs->data_only)
+ interesting = facets_key_name_is_filter(facets, field);
+ else
+ interesting = facets_key_name_is_facet(facets, field);
+
+ if(interesting) {
+ if(sd_journal_query_unique(j, field) >= 0) {
+ bool added_this_key = false;
+ size_t added_values = 0;
+
+ SD_JOURNAL_FOREACH_UNIQUE(j, data, data_length) {
+ const char *key, *value;
+ size_t key_length, value_length;
+
+ if(!parse_journal_field(data, data_length, &key, &key_length, &value, &value_length))
+ continue;
+
+ facets_add_possible_value_name_to_key(facets, key, key_length, value, value_length);
+
+ if(!facets_key_name_value_length_is_selected(facets, key, key_length, value, value_length))
+ continue;
+
+ if(added_keys && !added_this_key) {
+ if(sd_journal_add_conjunction(j) < 0)
+ failures++;
+
+ added_this_key = true;
+ added_keys++;
+ }
+ else if(added_values)
+ if(sd_journal_add_disjunction(j) < 0)
+ failures++;
+
+ if(sd_journal_add_match(j, data, data_length) < 0)
+ failures++;
+
+ added_values++;
+ filters_added++;
+ }
}
}
+ }
+
+ if(failures) {
+ log_fqs(fqs, "failed to setup journal filter, will run the full query.");
+ sd_journal_flush_matches(j);
+ return true;
+ }
+
+ return filters_added ? true : false;
+}
+#endif // HAVE_SD_JOURNAL_RESTART_FIELDS
+
+static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file(
+ const char *filename, BUFFER *wb, FACETS *facets,
+ struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
+
+ sd_journal *j = NULL;
+ errno = 0;
+
+ fstat_cache_enable_on_thread();
+
+ const char *paths[2] = {
+ [0] = filename,
+ [1] = NULL,
+ };
+
+ if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
+ fstat_cache_disable_on_thread();
+ return ND_SD_JOURNAL_FAILED_TO_OPEN;
+ }
+
+ ND_SD_JOURNAL_STATUS status;
+ bool matches_filters = true;
+
+#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
+ if(fqs->slice) {
+ usec_t started = now_monotonic_usec();
+
+ matches_filters = netdata_systemd_filtering_by_journal(j, facets, fqs) || !fqs->filters;
+ usec_t ended = now_monotonic_usec();
+
+ fqs->matches_setup_ut += (ended - started);
+ }
+#endif // HAVE_SD_JOURNAL_RESTART_FIELDS
+
+ if(matches_filters) {
+ if(fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD)
+ status = netdata_systemd_journal_query_forward(j, wb, facets, jf, fqs);
+ else
+ status = netdata_systemd_journal_query_backward(j, wb, facets, jf, fqs);
+ }
+ else
+ status = ND_SD_JOURNAL_NO_FILE_MATCHED;
+
+ sd_journal_close(j);
+ fstat_cache_disable_on_thread();
+
+ return status;
+}
+
+// ----------------------------------------------------------------------------
+// journal files registry
+
+#define VAR_LOG_JOURNAL_MAX_DEPTH 10
+#define MAX_JOURNAL_DIRECTORIES 100
+
+struct journal_directory {
+ char *path;
+ bool logged_failure;
+};
+
+static struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES] = { 0 };
+static DICTIONARY *journal_files_registry = NULL;
+static DICTIONARY *used_hashes_registry = NULL;
+
+static usec_t systemd_journal_session = 0;
+
+static void buffer_json_journal_versions(BUFFER *wb) {
+ buffer_json_member_add_object(wb, "versions");
+ {
+ buffer_json_member_add_uint64(wb, "sources",
+ systemd_journal_session + dictionary_version(journal_files_registry));
+ }
+ buffer_json_object_close(wb);
+}
+
+static void journal_file_update_msg_ut(const char *filename, struct journal_file *jf) {
+ fstat_cache_enable_on_thread();
+
+ const char *files[2] = {
+ [0] = filename,
+ [1] = NULL,
+ };
+
+ sd_journal *j = NULL;
+ if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) {
+ fstat_cache_disable_on_thread();
+
+ if(!jf->logged_failure) {
+ netdata_log_error("cannot open journal file '%s', using file timestamps to understand time-frame.", filename);
+ jf->logged_failure = true;
+ }
+
+ jf->msg_first_ut = 0;
+ jf->msg_last_ut = jf->file_last_modified_ut;
+ return;
+ }
+
+ usec_t first_ut = 0, last_ut = 0;
+
+ if(sd_journal_seek_head(j) < 0 || sd_journal_next(j) < 0 || sd_journal_get_realtime_usec(j, &first_ut) < 0 || !first_ut) {
+ internal_error(true, "cannot find the timestamp of the first message in '%s'", filename);
+ first_ut = 0;
+ }
+
+ if(sd_journal_seek_tail(j) < 0 || sd_journal_previous(j) < 0 || sd_journal_get_realtime_usec(j, &last_ut) < 0 || !last_ut) {
+ internal_error(true, "cannot find the timestamp of the last message in '%s'", filename);
+ last_ut = jf->file_last_modified_ut;
+ }
sd_journal_close(j);
+ fstat_cache_disable_on_thread();
+
+ if(first_ut > last_ut) {
+ internal_error(true, "timestamps are flipped in file '%s'", filename);
+ usec_t t = first_ut;
+ first_ut = last_ut;
+ last_ut = t;
+ }
+
+ jf->msg_first_ut = first_ut;
+ jf->msg_last_ut = last_ut;
+}
+
+static STRING *string_strdupz_source(const char *s, const char *e, size_t max_len, const char *prefix) {
+ char buf[max_len];
+ size_t len;
+ char *dst = buf;
+
+ if(prefix) {
+ len = strlen(prefix);
+ memcpy(buf, prefix, len);
+ dst = &buf[len];
+ max_len -= len;
+ }
+
+ len = e - s;
+ if(len >= max_len)
+ len = max_len - 1;
+ memcpy(dst, s, len);
+ dst[len] = '\0';
+ buf[max_len - 1] = '\0';
+
+ for(size_t i = 0; buf[i] ;i++)
+ if(!isalnum(buf[i]) && buf[i] != '-' && buf[i] != '.' && buf[i] != ':')
+ buf[i] = '_';
+
+ return string_strdupz(buf);
+}
+
+static void files_registry_insert_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) {
+ struct journal_file *jf = value;
+ jf->filename = dictionary_acquired_item_name(item);
+ jf->filename_len = strlen(jf->filename);
+
+ // based on the filename
+ // decide the source to show to the user
+ const char *s = strrchr(jf->filename, '/');
+ if(s) {
+ if(strstr(jf->filename, "/remote/"))
+ jf->source_type = SDJF_REMOTE;
+ else {
+ const char *t = s - 1;
+ while(t >= jf->filename && *t != '.' && *t != '/')
+ t--;
+
+ if(t >= jf->filename && *t == '.') {
+ jf->source_type = SDJF_NAMESPACE;
+ jf->source = string_strdupz_source(t + 1, s, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "namespace-");
+ }
+ else
+ jf->source_type = SDJF_LOCAL;
+ }
+
+ if(strncmp(s, "/system", 7) == 0)
+ jf->source_type |= SDJF_SYSTEM;
+
+ else if(strncmp(s, "/user", 5) == 0)
+ jf->source_type |= SDJF_USER;
+
+ else if(strncmp(s, "/remote-", 8) == 0) {
+ jf->source_type |= SDJF_REMOTE;
+
+ s = &s[8]; // skip "/remote-"
+
+ char *e = strchr(s, '@');
+ if(!e)
+ e = strstr(s, ".journal");
+
+ if(e) {
+ const char *d = s;
+ for(; d < e && (isdigit(*d) || *d == '.' || *d == ':') ; d++) ;
+ if(d == e) {
+ // a valid IP address
+ char ip[e - s + 1];
+ memcpy(ip, s, e - s);
+ ip[e - s] = '\0';
+ char buf[SYSTEMD_JOURNAL_MAX_SOURCE_LEN];
+ if(ip_to_hostname(ip, buf, sizeof(buf)))
+ jf->source = string_strdupz_source(buf, &buf[strlen(buf)], SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
+ else {
+ internal_error(true, "Cannot find the hostname for IP '%s'", ip);
+ jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
+ }
+ }
+ else
+ jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-");
+ }
+ else
+ jf->source_type |= SDJF_OTHER;
+ }
+ else
+ jf->source_type |= SDJF_OTHER;
+ }
+ else
+ jf->source_type = SDJF_LOCAL | SDJF_OTHER;
+
+ journal_file_update_msg_ut(jf->filename, jf);
+
+ internal_error(true,
+ "found journal file '%s', type %d, source '%s', "
+ "file modified: %"PRIu64", "
+ "msg {first: %"PRIu64", last: %"PRIu64"}",
+ jf->filename, jf->source_type, jf->source ? string2str(jf->source) : "<unset>",
+ jf->file_last_modified_ut,
+ jf->msg_first_ut, jf->msg_last_ut);
+}
+
+static bool files_registry_conflict_cb(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *data __maybe_unused) {
+ struct journal_file *jf = old_value;
+ struct journal_file *njf = new_value;
+
+ if(njf->last_scan_ut > jf->last_scan_ut)
+ jf->last_scan_ut = njf->last_scan_ut;
+
+ if(njf->file_last_modified_ut > jf->file_last_modified_ut) {
+ jf->file_last_modified_ut = njf->file_last_modified_ut;
+ jf->size = njf->size;
+
+ const char *filename = dictionary_acquired_item_name(item);
+ journal_file_update_msg_ut(filename, jf);
+
+// internal_error(true,
+// "updated journal file '%s', type %d, "
+// "file modified: %"PRIu64", "
+// "msg {first: %"PRIu64", last: %"PRIu64"}",
+// filename, jf->source_type,
+// jf->file_last_modified_ut,
+// jf->msg_first_ut, jf->msg_last_ut);
+ }
+
+ return false;
+}
+
+#define SDJF_SOURCE_ALL_NAME "all"
+#define SDJF_SOURCE_LOCAL_NAME "all-local-logs"
+#define SDJF_SOURCE_LOCAL_SYSTEM_NAME "all-local-system-logs"
+#define SDJF_SOURCE_LOCAL_USERS_NAME "all-local-user-logs"
+#define SDJF_SOURCE_LOCAL_OTHER_NAME "all-uncategorized"
+#define SDJF_SOURCE_NAMESPACES_NAME "all-local-namespaces"
+#define SDJF_SOURCE_REMOTES_NAME "all-remote-systems"
+
+struct journal_file_source {
+ usec_t first_ut;
+ usec_t last_ut;
+ size_t count;
+ uint64_t size;
+};
+
+static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) {
+ if(size > 1024ULL * 1024 * 1024 * 1024)
+ snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0);
+ else if(size > 1024ULL * 1024 * 1024)
+ snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0);
+ else if(size > 1024ULL * 1024)
+ snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0);
+ else if(size > 1024ULL)
+ snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0);
+ else
+ snprintfz(dst, dst_len, "%"PRIu64" B", size);
+}
+
+#define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \
+ if((remaining) > (duration)) { \
+ uint64_t _count = (remaining) / (duration); \
+ uint64_t _rem = (remaining) - (_count * (duration)); \
+ (pos) += snprintfz(&(dst)[pos], (dst_len) - (pos), "%s%s%"PRIu64" %s", (printed) ? ", " : "", _rem ? "" : "and ", _count, _count > 1 ? (many) : (one)); \
+ (remaining) = _rem; \
+ (printed) = true; \
+ } \
+} while(0)
+
+static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) {
+ if(duration_s < 0)
+ duration_s = -duration_s;
+
+ size_t pos = 0;
+ dst[0] = 0 ;
+
+ bool printed = false;
+ print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed);
+ print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed);
+ print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed);
+ print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed);
+ print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed);
+ print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed);
+}
+
+static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) {
+ struct journal_file_source *jfs = entry;
+ BUFFER *wb = data;
+
+ const char *name = dictionary_acquired_item_name(item);
+
+ buffer_json_add_array_item_object(wb);
+ {
+ char size_for_humans[100];
+ human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans));
+
+ char duration_for_humans[1024];
+ human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC),
+ duration_for_humans, sizeof(duration_for_humans));
+
+ char info[1024];
+ snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s",
+ jfs->count, size_for_humans, duration_for_humans);
+
+ buffer_json_member_add_string(wb, "id", name);
+ buffer_json_member_add_string(wb, "name", name);
+ buffer_json_member_add_string(wb, "pill", size_for_humans);
+ buffer_json_member_add_string(wb, "info", info);
+ }
+ buffer_json_object_close(wb); // options object
+
+ return 1;
+}
+
+static bool journal_file_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) {
+ struct journal_file_source *jfs = old_value, *njfs = new_value;
+ jfs->count += njfs->count;
+ jfs->size += njfs->size;
+
+ if(njfs->first_ut && njfs->first_ut < jfs->first_ut)
+ jfs->first_ut = njfs->first_ut;
+
+ if(njfs->last_ut && njfs->last_ut > jfs->last_ut)
+ jfs->last_ut = njfs->last_ut;
+
+ return false;
+}
+
+static void available_journal_file_sources_to_json_array(BUFFER *wb) {
+ DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_conflict_callback(dict, journal_file_merge_sizes, NULL);
+
+ struct journal_file_source t = { 0 };
+
+ struct journal_file *jf;
+ dfe_start_read(journal_files_registry, jf) {
+ t.first_ut = jf->msg_first_ut;
+ t.last_ut = jf->msg_last_ut;
+ t.count = 1;
+ t.size = jf->size;
+
+ dictionary_set(dict, SDJF_SOURCE_ALL_NAME, &t, sizeof(t));
+
+ if((jf->source_type & (SDJF_LOCAL)) == (SDJF_LOCAL))
+ dictionary_set(dict, SDJF_SOURCE_LOCAL_NAME, &t, sizeof(t));
+ if((jf->source_type & (SDJF_LOCAL | SDJF_SYSTEM)) == (SDJF_LOCAL | SDJF_SYSTEM))
+ dictionary_set(dict, SDJF_SOURCE_LOCAL_SYSTEM_NAME, &t, sizeof(t));
+ if((jf->source_type & (SDJF_LOCAL | SDJF_USER)) == (SDJF_LOCAL | SDJF_USER))
+ dictionary_set(dict, SDJF_SOURCE_LOCAL_USERS_NAME, &t, sizeof(t));
+ if((jf->source_type & (SDJF_LOCAL | SDJF_OTHER)) == (SDJF_LOCAL | SDJF_OTHER))
+ dictionary_set(dict, SDJF_SOURCE_LOCAL_OTHER_NAME, &t, sizeof(t));
+ if((jf->source_type & (SDJF_NAMESPACE)) == (SDJF_NAMESPACE))
+ dictionary_set(dict, SDJF_SOURCE_NAMESPACES_NAME, &t, sizeof(t));
+ if((jf->source_type & (SDJF_REMOTE)) == (SDJF_REMOTE))
+ dictionary_set(dict, SDJF_SOURCE_REMOTES_NAME, &t, sizeof(t));
+ if(jf->source)
+ dictionary_set(dict, string2str(jf->source), &t, sizeof(t));
+ }
+ dfe_done(jf);
+
+ dictionary_sorted_walkthrough_read(dict, journal_file_to_json_array_cb, wb);
+
+ dictionary_destroy(dict);
+}
+
+static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) {
+ struct journal_file *jf = value; (void)jf;
+ const char *filename = dictionary_acquired_item_name(item); (void)filename;
+
+ string_freez(jf->source);
+ internal_error(true, "removed journal file '%s'", filename);
+}
+
+void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_ut) {
+ static const char *ext = ".journal";
+ static const size_t ext_len = sizeof(".journal") - 1;
+
+ if (depth > VAR_LOG_JOURNAL_MAX_DEPTH)
+ return;
+
+ DIR *dir;
+ struct dirent *entry;
+ struct stat info;
+ char absolute_path[FILENAME_MAX];
+
+ // Open the directory.
+ if ((dir = opendir(dirname)) == NULL) {
+ if(errno != ENOENT && errno != ENOTDIR)
+ netdata_log_error("Cannot opendir() '%s'", dirname);
+ return;
+ }
+
+ // Read each entry in the directory.
+ while ((entry = readdir(dir)) != NULL) {
+ snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name);
+ if (stat(absolute_path, &info) != 0) {
+ netdata_log_error("Failed to stat() '%s", absolute_path);
+ continue;
+ }
+
+ if (S_ISDIR(info.st_mode)) {
+ // If entry is a directory, call traverse recursively.
+ if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0)
+ journal_directory_scan(absolute_path, depth + 1, last_scan_ut);
+
+ }
+ else if (S_ISREG(info.st_mode)) {
+ // If entry is a regular file, check if it ends with .journal.
+ char *filename = entry->d_name;
+ size_t len = strlen(filename);
+
+ if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) {
+ struct journal_file t = {
+ .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC,
+ .last_scan_ut = last_scan_ut,
+ .size = info.st_size,
+ .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT,
+ };
+ dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t));
+ }
+ }
+ }
+
+ closedir(dir);
+}
+
+static void journal_files_registry_update() {
+ usec_t scan_ut = now_monotonic_usec();
+
+ for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES ;i++) {
+ if(!journal_directories[i].path)
+ break;
+
+ journal_directory_scan(journal_directories[i].path, 0, scan_ut);
+ }
+
+ struct journal_file *jf;
+ dfe_start_write(journal_files_registry, jf) {
+ if(jf->last_scan_ut < scan_ut)
+ dictionary_del(journal_files_registry, jf_dfe.name);
+ }
+ dfe_done(jf);
+}
+
+// ----------------------------------------------------------------------------
+
+static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
+
+ if((fqs->source_type == SDJF_ALL || (jf->source_type & fqs->source_type) == fqs->source_type) &&
+ (!fqs->source || fqs->source == jf->source)) {
+
+ usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT;
+ usec_t first_ut = jf->msg_first_ut;
+ usec_t last_ut = jf->msg_last_ut + anchor_delta;
+
+ if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut)
+ return true;
+ }
+
+ return false;
+}
+
+static int journal_file_dict_items_backward_compar(const void *a, const void *b) {
+ const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b;
+ struct journal_file *jfa = dictionary_acquired_item_value(*ad);
+ struct journal_file *jfb = dictionary_acquired_item_value(*bd);
+
+ if(jfa->msg_last_ut < jfb->msg_last_ut)
+ return 1;
+
+ if(jfa->msg_last_ut > jfb->msg_last_ut)
+ return -1;
+
+ if(jfa->msg_first_ut < jfb->msg_first_ut)
+ return 1;
+
+ if(jfa->msg_first_ut > jfb->msg_first_ut)
+ return -1;
+
+ return 0;
+}
+
+static int journal_file_dict_items_forward_compar(const void *a, const void *b) {
+ return -journal_file_dict_items_backward_compar(a, b);
+}
+
+static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) {
+ ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_NO_FILE_MATCHED;
+ struct journal_file *jf;
+
+ fqs->files_matched = 0;
+ fqs->file_working = 0;
+ fqs->rows_useful = 0;
+ fqs->rows_read = 0;
+ fqs->bytes_read = 0;
+
+ size_t files_used = 0;
+ size_t files_max = dictionary_entries(journal_files_registry);
+ const DICTIONARY_ITEM *file_items[files_max];
+
+ // count the files
+ bool files_are_newer = false;
+ dfe_start_read(journal_files_registry, jf) {
+ if(!jf_is_mine(jf, fqs))
+ continue;
+
+ file_items[files_used++] = dictionary_acquired_item_dup(journal_files_registry, jf_dfe.item);
+
+ if(jf->msg_last_ut > fqs->if_modified_since)
+ files_are_newer = true;
+ }
+ dfe_done(jf);
+
+ fqs->files_matched = files_used;
+
+ if(fqs->if_modified_since && !files_are_newer) {
+ buffer_flush(wb);
+ return HTTP_RESP_NOT_MODIFIED;
+ }
+
+ // sort the files, so that they are optimal for facets
+ if(files_used >= 2) {
+ if (fqs->direction == FACETS_ANCHOR_DIRECTION_BACKWARD)
+ qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
+ journal_file_dict_items_backward_compar);
+ else
+ qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *),
+ journal_file_dict_items_forward_compar);
+ }
+
+ bool partial = false;
+ usec_t started_ut;
+ usec_t ended_ut = now_monotonic_usec();
+
+ buffer_json_member_add_array(wb, "_journal_files");
+ for(size_t f = 0; f < files_used ;f++) {
+ const char *filename = dictionary_acquired_item_name(file_items[f]);
+ jf = dictionary_acquired_item_value(file_items[f]);
+
+ if(!jf_is_mine(jf, fqs))
+ continue;
+
+ fqs->file_working++;
+ fqs->cached_count = 0;
+
+ size_t fs_calls = fstat_thread_calls;
+ size_t fs_cached = fstat_thread_cached_responses;
+ size_t rows_useful = fqs->rows_useful;
+ size_t rows_read = fqs->rows_read;
+ size_t bytes_read = fqs->bytes_read;
+ size_t matches_setup_ut = fqs->matches_setup_ut;
+
+ ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs);
+
+ rows_useful = fqs->rows_useful - rows_useful;
+ rows_read = fqs->rows_read - rows_read;
+ bytes_read = fqs->bytes_read - bytes_read;
+ matches_setup_ut = fqs->matches_setup_ut - matches_setup_ut;
+ fs_calls = fstat_thread_calls - fs_calls;
+ fs_cached = fstat_thread_cached_responses - fs_cached;
+
+ started_ut = ended_ut;
+ ended_ut = now_monotonic_usec();
+ usec_t duration_ut = ended_ut - started_ut;
+
+ buffer_json_add_array_item_object(wb); // journal file
+ {
+ // information about the file
+ buffer_json_member_add_string(wb, "_filename", filename);
+ buffer_json_member_add_uint64(wb, "_source_type", jf->source_type);
+ buffer_json_member_add_string(wb, "_source", string2str(jf->source));
+ buffer_json_member_add_uint64(wb, "_last_modified_ut", jf->file_last_modified_ut);
+ buffer_json_member_add_uint64(wb, "_msg_first_ut", jf->msg_first_ut);
+ buffer_json_member_add_uint64(wb, "_msg_last_ut", jf->msg_last_ut);
+ buffer_json_member_add_uint64(wb, "_journal_vs_realtime_delta_ut", jf->max_journal_vs_realtime_delta_ut);
+
+ // information about the current use of the file
+ buffer_json_member_add_uint64(wb, "duration_ut", ended_ut - started_ut);
+ buffer_json_member_add_uint64(wb, "rows_read", rows_read);
+ buffer_json_member_add_uint64(wb, "rows_useful", rows_useful);
+ buffer_json_member_add_double(wb, "rows_per_second", (double) rows_read / (double) duration_ut * (double) USEC_PER_SEC);
+ buffer_json_member_add_uint64(wb, "bytes_read", bytes_read);
+ buffer_json_member_add_double(wb, "bytes_per_second", (double) bytes_read / (double) duration_ut * (double) USEC_PER_SEC);
+ buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut);
+ buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls);
+ buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached);
+ }
+ buffer_json_object_close(wb); // journal file
+
+ bool stop = false;
+ switch(tmp_status) {
+ case ND_SD_JOURNAL_OK:
+ case ND_SD_JOURNAL_NO_FILE_MATCHED:
+ status = (status == ND_SD_JOURNAL_OK) ? ND_SD_JOURNAL_OK : tmp_status;
+ break;
+
+ case ND_SD_JOURNAL_FAILED_TO_OPEN:
+ case ND_SD_JOURNAL_FAILED_TO_SEEK:
+ partial = true;
+ if(status == ND_SD_JOURNAL_NO_FILE_MATCHED)
+ status = tmp_status;
+ break;
+
+ case ND_SD_JOURNAL_CANCELLED:
+ case ND_SD_JOURNAL_TIMED_OUT:
+ partial = true;
+ stop = true;
+ status = tmp_status;
+ break;
+
+ case ND_SD_JOURNAL_NOT_MODIFIED:
+ internal_fatal(true, "this should never be returned here");
+ break;
+ }
+
+ if(stop)
+ break;
+ }
+ buffer_json_array_close(wb); // _journal_files
+
+ // release the files
+ for(size_t f = 0; f < files_used ;f++)
+ dictionary_acquired_item_release(journal_files_registry, file_items[f]);
+
+ switch (status) {
+ case ND_SD_JOURNAL_OK:
+ if(fqs->if_modified_since && !fqs->rows_useful) {
+ buffer_flush(wb);
+ return HTTP_RESP_NOT_MODIFIED;
+ }
+ break;
+
+ case ND_SD_JOURNAL_TIMED_OUT:
+ case ND_SD_JOURNAL_NO_FILE_MATCHED:
+ break;
+
+ case ND_SD_JOURNAL_CANCELLED:
+ buffer_flush(wb);
+ return HTTP_RESP_CLIENT_CLOSED_REQUEST;
+
+ case ND_SD_JOURNAL_NOT_MODIFIED:
+ buffer_flush(wb);
+ return HTTP_RESP_NOT_MODIFIED;
+
+ default:
+ case ND_SD_JOURNAL_FAILED_TO_OPEN:
+ case ND_SD_JOURNAL_FAILED_TO_SEEK:
+ buffer_flush(wb);
+ return HTTP_RESP_INTERNAL_SERVER_ERROR;
+ }
buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
- buffer_json_member_add_boolean(wb, "partial", timed_out);
+ buffer_json_member_add_boolean(wb, "partial", partial);
buffer_json_member_add_string(wb, "type", "table");
- buffer_json_member_add_time_t(wb, "update_every", 1);
- buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
- facets_report(facets, wb);
+ if(!fqs->data_only) {
+ buffer_json_member_add_time_t(wb, "update_every", 1);
+ buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+ }
+
+ if(!fqs->data_only || fqs->tail)
+ buffer_json_member_add_uint64(wb, "last_modified", fqs->last_modified);
+
+ facets_sort_and_reorder_keys(facets);
+ facets_report(facets, wb, used_hashes_registry);
+
+ buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (fqs->data_only ? 3600 : 0));
- buffer_json_member_add_time_t(wb, "expires", now_realtime_sec());
+ buffer_json_member_add_object(wb, "_fstat_caching");
+ {
+ buffer_json_member_add_uint64(wb, "calls", fstat_thread_calls);
+ buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses);
+ }
+ buffer_json_object_close(wb); // _fstat_caching
buffer_json_finalize(wb);
return HTTP_RESP_OK;
}
-static void systemd_journal_function_help(const char *transaction) {
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600);
- fprintf(stdout,
+static void netdata_systemd_journal_function_help(const char *transaction) {
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_sprintf(wb,
"%s / %s\n"
"\n"
"%s\n"
"\n"
- "The following filters are supported:\n"
+ "The following parameters are supported:\n"
"\n"
- " help\n"
+ " "JOURNAL_PARAMETER_HELP"\n"
" Shows this help message.\n"
"\n"
- " before:TIMESTAMP\n"
+ " "JOURNAL_PARAMETER_INFO"\n"
+ " Request initial configuration information about the plugin.\n"
+ " The key entity returned is the required_params array, which includes\n"
+ " all the available systemd journal sources.\n"
+ " When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_ID":STRING\n"
+ " Caller supplied unique ID of the request.\n"
+ " This can be used later to request a progress report of the query.\n"
+ " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_PROGRESS"\n"
+ " Request a progress report (the `id` of a running query is required).\n"
+ " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n"
+ " Quickly respond with data requested, without generating a\n"
+ " `histogram`, `facets` counters and `items`.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_DELTA":true or "JOURNAL_PARAMETER_DELTA":false\n"
+ " When doing data only queries, include deltas for histogram, facets and items.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_TAIL":true or "JOURNAL_PARAMETER_TAIL":false\n"
+ " When doing data only queries, respond with the newest messages,\n"
+ " and up to the anchor, but calculate deltas (if requested) for\n"
+ " the duration [anchor - before].\n"
+ "\n"
+ " "JOURNAL_PARAMETER_SLICE":true or "JOURNAL_PARAMETER_SLICE":false\n"
+ " When it is turned on, the plugin is executing filtering via libsystemd,\n"
+ " utilizing all the available indexes of the journal files.\n"
+ " When it is off, only the time constraint is handled by libsystemd and\n"
+ " all filtering is done by the plugin.\n"
+ " The default is: %s\n"
+ "\n"
+ " "JOURNAL_PARAMETER_SOURCE":SOURCE\n"
+ " Query only the specified journal sources.\n"
+ " Do an `"JOURNAL_PARAMETER_INFO"` query to find the sources.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_BEFORE":TIMESTAMP_IN_SECONDS\n"
" Absolute or relative (to now) timestamp in seconds, to start the query.\n"
" The query is always executed from the most recent to the oldest log entry.\n"
" If not given the default is: now.\n"
"\n"
- " after:TIMESTAMP\n"
+ " "JOURNAL_PARAMETER_AFTER":TIMESTAMP_IN_SECONDS\n"
" Absolute or relative (to `before`) timestamp in seconds, to end the query.\n"
" If not given, the default is %d.\n"
"\n"
- " last:ITEMS\n"
+ " "JOURNAL_PARAMETER_LAST":ITEMS\n"
" The number of items to return.\n"
" The default is %d.\n"
"\n"
- " anchor:NUMBER\n"
- " The `timestamp` of the item last received, to return log entries after that.\n"
- " If not given, the query will return the top `ITEMS` from the most recent.\n"
+ " "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n"
+ " Return items relative to this timestamp.\n"
+ " The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_DIRECTION":forward or "JOURNAL_PARAMETER_DIRECTION":backward\n"
+ " When set to `backward` (default) the items returned are the newest before the\n"
+ " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_BEFORE"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n"
+ " When set to `forward` the items returned are the oldest after the\n"
+ " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_AFTER"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n"
+ " The default is: %s\n"
+ "\n"
+ " "JOURNAL_PARAMETER_QUERY":SIMPLE_PATTERN\n"
+ " Do a full text search to find the log entries matching the pattern given.\n"
+ " The plugin is searching for matches on all fields of the database.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_IF_MODIFIED_SINCE":TIMESTAMP_IN_MICROSECONDS\n"
+ " Each successful response, includes a `last_modified` field.\n"
+ " By providing the timestamp to the `"JOURNAL_PARAMETER_IF_MODIFIED_SINCE"` parameter,\n"
+ " the plugin will return 200 with a successful response, or 304 if the source has not\n"
+ " been modified since that timestamp.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_HISTOGRAM":facet_id\n"
+ " Use the given `facet_id` for the histogram.\n"
+ " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n"
+ "\n"
+ " "JOURNAL_PARAMETER_FACETS":facet_id1,facet_id2,facet_id3,...\n"
+ " Add the given facets to the list of fields for which analysis is required.\n"
+ " The plugin will offer both a histogram and facet value counters for its values.\n"
+ " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n"
"\n"
" facet_id:value_id1,value_id2,value_id3,...\n"
" Apply filters to the query, based on the facet IDs returned.\n"
" Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n"
"\n"
- "Filters can be combined. Each filter can be given only one time.\n"
, program_name
, SYSTEMD_JOURNAL_FUNCTION_NAME
, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION
+ , JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice
, -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION
, SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY
+ , JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward"
);
- pluginsd_function_result_end_to_stdout();
+
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb);
+ netdata_mutex_unlock(&stdout_mutex);
+
+ buffer_free(wb);
}
+const char *errno_map[] = {
+ [1] = "1 (EPERM)", // "Operation not permitted",
+ [2] = "2 (ENOENT)", // "No such file or directory",
+ [3] = "3 (ESRCH)", // "No such process",
+ [4] = "4 (EINTR)", // "Interrupted system call",
+ [5] = "5 (EIO)", // "Input/output error",
+ [6] = "6 (ENXIO)", // "No such device or address",
+ [7] = "7 (E2BIG)", // "Argument list too long",
+ [8] = "8 (ENOEXEC)", // "Exec format error",
+ [9] = "9 (EBADF)", // "Bad file descriptor",
+ [10] = "10 (ECHILD)", // "No child processes",
+ [11] = "11 (EAGAIN)", // "Resource temporarily unavailable",
+ [12] = "12 (ENOMEM)", // "Cannot allocate memory",
+ [13] = "13 (EACCES)", // "Permission denied",
+ [14] = "14 (EFAULT)", // "Bad address",
+ [15] = "15 (ENOTBLK)", // "Block device required",
+ [16] = "16 (EBUSY)", // "Device or resource busy",
+ [17] = "17 (EEXIST)", // "File exists",
+ [18] = "18 (EXDEV)", // "Invalid cross-device link",
+ [19] = "19 (ENODEV)", // "No such device",
+ [20] = "20 (ENOTDIR)", // "Not a directory",
+ [21] = "21 (EISDIR)", // "Is a directory",
+ [22] = "22 (EINVAL)", // "Invalid argument",
+ [23] = "23 (ENFILE)", // "Too many open files in system",
+ [24] = "24 (EMFILE)", // "Too many open files",
+ [25] = "25 (ENOTTY)", // "Inappropriate ioctl for device",
+ [26] = "26 (ETXTBSY)", // "Text file busy",
+ [27] = "27 (EFBIG)", // "File too large",
+ [28] = "28 (ENOSPC)", // "No space left on device",
+ [29] = "29 (ESPIPE)", // "Illegal seek",
+ [30] = "30 (EROFS)", // "Read-only file system",
+ [31] = "31 (EMLINK)", // "Too many links",
+ [32] = "32 (EPIPE)", // "Broken pipe",
+ [33] = "33 (EDOM)", // "Numerical argument out of domain",
+ [34] = "34 (ERANGE)", // "Numerical result out of range",
+ [35] = "35 (EDEADLK)", // "Resource deadlock avoided",
+ [36] = "36 (ENAMETOOLONG)", // "File name too long",
+ [37] = "37 (ENOLCK)", // "No locks available",
+ [38] = "38 (ENOSYS)", // "Function not implemented",
+ [39] = "39 (ENOTEMPTY)", // "Directory not empty",
+ [40] = "40 (ELOOP)", // "Too many levels of symbolic links",
+ [42] = "42 (ENOMSG)", // "No message of desired type",
+ [43] = "43 (EIDRM)", // "Identifier removed",
+ [44] = "44 (ECHRNG)", // "Channel number out of range",
+ [45] = "45 (EL2NSYNC)", // "Level 2 not synchronized",
+ [46] = "46 (EL3HLT)", // "Level 3 halted",
+ [47] = "47 (EL3RST)", // "Level 3 reset",
+ [48] = "48 (ELNRNG)", // "Link number out of range",
+ [49] = "49 (EUNATCH)", // "Protocol driver not attached",
+ [50] = "50 (ENOCSI)", // "No CSI structure available",
+ [51] = "51 (EL2HLT)", // "Level 2 halted",
+ [52] = "52 (EBADE)", // "Invalid exchange",
+ [53] = "53 (EBADR)", // "Invalid request descriptor",
+ [54] = "54 (EXFULL)", // "Exchange full",
+ [55] = "55 (ENOANO)", // "No anode",
+ [56] = "56 (EBADRQC)", // "Invalid request code",
+ [57] = "57 (EBADSLT)", // "Invalid slot",
+ [59] = "59 (EBFONT)", // "Bad font file format",
+ [60] = "60 (ENOSTR)", // "Device not a stream",
+ [61] = "61 (ENODATA)", // "No data available",
+ [62] = "62 (ETIME)", // "Timer expired",
+ [63] = "63 (ENOSR)", // "Out of streams resources",
+ [64] = "64 (ENONET)", // "Machine is not on the network",
+ [65] = "65 (ENOPKG)", // "Package not installed",
+ [66] = "66 (EREMOTE)", // "Object is remote",
+ [67] = "67 (ENOLINK)", // "Link has been severed",
+ [68] = "68 (EADV)", // "Advertise error",
+ [69] = "69 (ESRMNT)", // "Srmount error",
+ [70] = "70 (ECOMM)", // "Communication error on send",
+ [71] = "71 (EPROTO)", // "Protocol error",
+ [72] = "72 (EMULTIHOP)", // "Multihop attempted",
+ [73] = "73 (EDOTDOT)", // "RFS specific error",
+ [74] = "74 (EBADMSG)", // "Bad message",
+ [75] = "75 (EOVERFLOW)", // "Value too large for defined data type",
+ [76] = "76 (ENOTUNIQ)", // "Name not unique on network",
+ [77] = "77 (EBADFD)", // "File descriptor in bad state",
+ [78] = "78 (EREMCHG)", // "Remote address changed",
+ [79] = "79 (ELIBACC)", // "Can not access a needed shared library",
+ [80] = "80 (ELIBBAD)", // "Accessing a corrupted shared library",
+ [81] = "81 (ELIBSCN)", // ".lib section in a.out corrupted",
+ [82] = "82 (ELIBMAX)", // "Attempting to link in too many shared libraries",
+ [83] = "83 (ELIBEXEC)", // "Cannot exec a shared library directly",
+ [84] = "84 (EILSEQ)", // "Invalid or incomplete multibyte or wide character",
+ [85] = "85 (ERESTART)", // "Interrupted system call should be restarted",
+ [86] = "86 (ESTRPIPE)", // "Streams pipe error",
+ [87] = "87 (EUSERS)", // "Too many users",
+ [88] = "88 (ENOTSOCK)", // "Socket operation on non-socket",
+ [89] = "89 (EDESTADDRREQ)", // "Destination address required",
+ [90] = "90 (EMSGSIZE)", // "Message too long",
+ [91] = "91 (EPROTOTYPE)", // "Protocol wrong type for socket",
+ [92] = "92 (ENOPROTOOPT)", // "Protocol not available",
+ [93] = "93 (EPROTONOSUPPORT)", // "Protocol not supported",
+ [94] = "94 (ESOCKTNOSUPPORT)", // "Socket type not supported",
+ [95] = "95 (ENOTSUP)", // "Operation not supported",
+ [96] = "96 (EPFNOSUPPORT)", // "Protocol family not supported",
+ [97] = "97 (EAFNOSUPPORT)", // "Address family not supported by protocol",
+ [98] = "98 (EADDRINUSE)", // "Address already in use",
+ [99] = "99 (EADDRNOTAVAIL)", // "Cannot assign requested address",
+ [100] = "100 (ENETDOWN)", // "Network is down",
+ [101] = "101 (ENETUNREACH)", // "Network is unreachable",
+ [102] = "102 (ENETRESET)", // "Network dropped connection on reset",
+ [103] = "103 (ECONNABORTED)", // "Software caused connection abort",
+ [104] = "104 (ECONNRESET)", // "Connection reset by peer",
+ [105] = "105 (ENOBUFS)", // "No buffer space available",
+ [106] = "106 (EISCONN)", // "Transport endpoint is already connected",
+ [107] = "107 (ENOTCONN)", // "Transport endpoint is not connected",
+ [108] = "108 (ESHUTDOWN)", // "Cannot send after transport endpoint shutdown",
+ [109] = "109 (ETOOMANYREFS)", // "Too many references: cannot splice",
+ [110] = "110 (ETIMEDOUT)", // "Connection timed out",
+ [111] = "111 (ECONNREFUSED)", // "Connection refused",
+ [112] = "112 (EHOSTDOWN)", // "Host is down",
+ [113] = "113 (EHOSTUNREACH)", // "No route to host",
+ [114] = "114 (EALREADY)", // "Operation already in progress",
+ [115] = "115 (EINPROGRESS)", // "Operation now in progress",
+ [116] = "116 (ESTALE)", // "Stale file handle",
+ [117] = "117 (EUCLEAN)", // "Structure needs cleaning",
+ [118] = "118 (ENOTNAM)", // "Not a XENIX named type file",
+ [119] = "119 (ENAVAIL)", // "No XENIX semaphores available",
+ [120] = "120 (EISNAM)", // "Is a named type file",
+ [121] = "121 (EREMOTEIO)", // "Remote I/O error",
+ [122] = "122 (EDQUOT)", // "Disk quota exceeded",
+ [123] = "123 (ENOMEDIUM)", // "No medium found",
+ [124] = "124 (EMEDIUMTYPE)", // "Wrong medium type",
+ [125] = "125 (ECANCELED)", // "Operation canceled",
+ [126] = "126 (ENOKEY)", // "Required key not available",
+ [127] = "127 (EKEYEXPIRED)", // "Key has expired",
+ [128] = "128 (EKEYREVOKED)", // "Key has been revoked",
+ [129] = "129 (EKEYREJECTED)", // "Key was rejected by service",
+ [130] = "130 (EOWNERDEAD)", // "Owner died",
+ [131] = "131 (ENOTRECOVERABLE)", // "State not recoverable",
+ [132] = "132 (ERFKILL)", // "Operation not possible due to RF-kill",
+ [133] = "133 (EHWPOISON)", // "Memory page has hardware error",
+};
+
static const char *syslog_facility_to_name(int facility) {
switch (facility) {
case LOG_FAC(LOG_KERN): return "kern";
@@ -216,31 +1690,57 @@ static const char *syslog_priority_to_name(int priority) {
}
}
+static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets __maybe_unused, FACET_ROW *row, void *data __maybe_unused) {
+ // same to
+ // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501
+ // function get_log_colors()
+
+ FACET_ROW_KEY_VALUE *priority_rkv = dictionary_get(row->dict, "PRIORITY");
+ if(!priority_rkv || priority_rkv->empty)
+ return FACET_ROW_SEVERITY_NORMAL;
+
+ int priority = str2i(buffer_tostring(priority_rkv->wb));
+
+ if(priority <= LOG_ERR)
+ return FACET_ROW_SEVERITY_CRITICAL;
+
+ else if (priority <= LOG_WARNING)
+ return FACET_ROW_SEVERITY_WARNING;
+
+ else if(priority <= LOG_NOTICE)
+ return FACET_ROW_SEVERITY_NOTICE;
+
+ else if(priority >= LOG_DEBUG)
+ return FACET_ROW_SEVERITY_DEBUG;
+
+ return FACET_ROW_SEVERITY_NORMAL;
+}
+
static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) {
- struct passwd pw, *result;
- char tmp[1024 + 1];
+ static __thread char tmp[1024 + 1];
+ struct passwd pw, *result = NULL;
- if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL)
- return NULL;
+ if (getpwuid_r(uid, &pw, tmp, sizeof(tmp), &result) != 0 || !result || !pw.pw_name || !(*pw.pw_name))
+ snprintfz(buffer, buffer_size - 1, "%u", uid);
+ else
+ snprintfz(buffer, buffer_size - 1, "%u (%s)", uid, pw.pw_name);
- strncpy(buffer, pw.pw_name, buffer_size - 1);
- buffer[buffer_size - 1] = '\0'; // Null-terminate just in case
return buffer;
}
static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) {
- struct group grp, *result;
- char tmp[1024 + 1];
+ static __thread char tmp[1024];
+ struct group grp, *result = NULL;
- if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL)
- return NULL;
+ if (getgrgid_r(gid, &grp, tmp, sizeof(tmp), &result) != 0 || !result || !grp.gr_name || !(*grp.gr_name))
+ snprintfz(buffer, buffer_size - 1, "%u", gid);
+ else
+ snprintfz(buffer, buffer_size - 1, "%u (%s)", gid, grp.gr_name);
- strncpy(buffer, grp.gr_name, buffer_size - 1);
- buffer[buffer_size - 1] = '\0'; // Null-terminate just in case
return buffer;
}
-static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
int facility = str2i(buffer_tostring(wb));
@@ -252,7 +1752,10 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu
}
}
-static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) {
+static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ if(scope == FACETS_TRANSFORM_FACET_SORT)
+ return;
+
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
int priority = str2i(buffer_tostring(wb));
@@ -264,141 +1767,663 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU
}
}
-static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
- DICTIONARY *cache = data;
+static void netdata_systemd_journal_transform_errno(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ if(scope == FACETS_TRANSFORM_FACET_SORT)
+ return;
+
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
- const char *sv = dictionary_get(cache, v);
- if(!sv) {
- char buf[1024 + 1];
- int uid = str2i(buffer_tostring(wb));
- const char *name = uid_to_username(uid, buf, 1024);
- if (!name)
- name = v;
+ unsigned err_no = str2u(buffer_tostring(wb));
+ if(err_no > 0 && err_no < sizeof(errno_map) / sizeof(*errno_map)) {
+ const char *name = errno_map[err_no];
+ if(name) {
+ buffer_flush(wb);
+ buffer_strcat(wb, name);
+ }
+ }
+ }
+}
+
+// ----------------------------------------------------------------------------
+// UID and GID transformation
+
+#define UID_GID_HASHTABLE_SIZE 10000
+
+struct word_t2str_hashtable_entry {
+ struct word_t2str_hashtable_entry *next;
+ Word_t hash;
+ size_t len;
+ char str[];
+};
+
+struct word_t2str_hashtable {
+ SPINLOCK spinlock;
+ size_t size;
+ struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE];
+};
+
+struct word_t2str_hashtable uid_hashtable = {
+ .size = UID_GID_HASHTABLE_SIZE,
+};
+
+struct word_t2str_hashtable gid_hashtable = {
+ .size = UID_GID_HASHTABLE_SIZE,
+};
+
+struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) {
+ size_t slot = hash % ht->size;
+ struct word_t2str_hashtable_entry **e = &ht->hashtable[slot];
+
+ while(*e && (*e)->hash != hash)
+ e = &((*e)->next);
+
+ return e;
+}
+
+const char *uid_to_username_cached(uid_t uid, size_t *length) {
+ spinlock_lock(&uid_hashtable.spinlock);
+
+ struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid);
+ if(!(*e)) {
+ static __thread char buf[1024];
+ const char *name = uid_to_username(uid, buf, sizeof(buf));
+ size_t size = strlen(name) + 1;
+
+ *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
+ (*e)->len = size - 1;
+ (*e)->hash = uid;
+ memcpy((*e)->str, name, size);
+ }
+
+ spinlock_unlock(&uid_hashtable.spinlock);
- sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
+ *length = (*e)->len;
+ return (*e)->str;
+}
+
+const char *gid_to_groupname_cached(gid_t gid, size_t *length) {
+ spinlock_lock(&gid_hashtable.spinlock);
+
+ struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid);
+ if(!(*e)) {
+ static __thread char buf[1024];
+ const char *name = gid_to_groupname(gid, buf, sizeof(buf));
+ size_t size = strlen(name) + 1;
+
+ *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size);
+ (*e)->len = size - 1;
+ (*e)->hash = gid;
+ memcpy((*e)->str, name, size);
+ }
+
+ spinlock_unlock(&gid_hashtable.spinlock);
+
+ *length = (*e)->len;
+ return (*e)->str;
+}
+
+DICTIONARY *boot_ids_to_first_ut = NULL;
+
+static void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ const char *boot_id = buffer_tostring(wb);
+ if(*boot_id && isxdigit(*boot_id)) {
+ usec_t ut = UINT64_MAX;
+ usec_t *p_ut = dictionary_get(boot_ids_to_first_ut, boot_id);
+ if(!p_ut) {
+ struct journal_file *jf;
+ dfe_start_read(journal_files_registry, jf) {
+ const char *files[2] = {
+ [0] = jf_dfe.name,
+ [1] = NULL,
+ };
+
+ sd_journal *j = NULL;
+ if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j)
+ continue;
+
+ char m[100];
+ size_t len = snprintfz(m, sizeof(m), "_BOOT_ID=%s", boot_id);
+ usec_t t_ut = 0;
+ if(sd_journal_add_match(j, m, len) < 0 ||
+ sd_journal_seek_head(j) < 0 ||
+ sd_journal_next(j) < 0 ||
+ sd_journal_get_realtime_usec(j, &t_ut) < 0 || !t_ut) {
+ sd_journal_close(j);
+ continue;
+ }
+
+ if(t_ut < ut)
+ ut = t_ut;
+
+ sd_journal_close(j);
+ }
+ dfe_done(jf);
+
+ dictionary_set(boot_ids_to_first_ut, boot_id, &ut, sizeof(ut));
}
+ else
+ ut = *p_ut;
+
+ if(ut != UINT64_MAX) {
+ time_t timestamp_sec = (time_t)(ut / USEC_PER_SEC);
+ struct tm tm;
+ char buffer[30];
+
+ gmtime_r(&timestamp_sec, &tm);
+ strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm);
+
+ switch(scope) {
+ default:
+ case FACETS_TRANSFORM_DATA:
+ case FACETS_TRANSFORM_VALUE:
+ buffer_sprintf(wb, " (%s UTC) ", buffer);
+ break;
+
+ case FACETS_TRANSFORM_FACET:
+ case FACETS_TRANSFORM_FACET_SORT:
+ case FACETS_TRANSFORM_HISTOGRAM:
+ buffer_flush(wb);
+ buffer_sprintf(wb, "%s UTC", buffer);
+ break;
+ }
+ }
+ }
+}
- buffer_flush(wb);
- buffer_strcat(wb, sv);
+static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ if(scope == FACETS_TRANSFORM_FACET_SORT)
+ return;
+
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ uid_t uid = str2i(buffer_tostring(wb));
+ size_t len;
+ const char *name = uid_to_username_cached(uid, &len);
+ buffer_contents_replace(wb, name, len);
}
}
-static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) {
- DICTIONARY *cache = data;
+static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ if(scope == FACETS_TRANSFORM_FACET_SORT)
+ return;
+
const char *v = buffer_tostring(wb);
if(*v && isdigit(*v)) {
- const char *sv = dictionary_get(cache, v);
- if(!sv) {
- char buf[1024 + 1];
- int gid = str2i(buffer_tostring(wb));
- const char *name = gid_to_groupname(gid, buf, 1024);
- if (!name)
- name = v;
+ gid_t gid = str2i(buffer_tostring(wb));
+ size_t len;
+ const char *name = gid_to_groupname_cached(gid, &len);
+ buffer_contents_replace(wb, name, len);
+ }
+}
- sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1);
+const char *linux_capabilities[] = {
+ [CAP_CHOWN] = "CHOWN",
+ [CAP_DAC_OVERRIDE] = "DAC_OVERRIDE",
+ [CAP_DAC_READ_SEARCH] = "DAC_READ_SEARCH",
+ [CAP_FOWNER] = "FOWNER",
+ [CAP_FSETID] = "FSETID",
+ [CAP_KILL] = "KILL",
+ [CAP_SETGID] = "SETGID",
+ [CAP_SETUID] = "SETUID",
+ [CAP_SETPCAP] = "SETPCAP",
+ [CAP_LINUX_IMMUTABLE] = "LINUX_IMMUTABLE",
+ [CAP_NET_BIND_SERVICE] = "NET_BIND_SERVICE",
+ [CAP_NET_BROADCAST] = "NET_BROADCAST",
+ [CAP_NET_ADMIN] = "NET_ADMIN",
+ [CAP_NET_RAW] = "NET_RAW",
+ [CAP_IPC_LOCK] = "IPC_LOCK",
+ [CAP_IPC_OWNER] = "IPC_OWNER",
+ [CAP_SYS_MODULE] = "SYS_MODULE",
+ [CAP_SYS_RAWIO] = "SYS_RAWIO",
+ [CAP_SYS_CHROOT] = "SYS_CHROOT",
+ [CAP_SYS_PTRACE] = "SYS_PTRACE",
+ [CAP_SYS_PACCT] = "SYS_PACCT",
+ [CAP_SYS_ADMIN] = "SYS_ADMIN",
+ [CAP_SYS_BOOT] = "SYS_BOOT",
+ [CAP_SYS_NICE] = "SYS_NICE",
+ [CAP_SYS_RESOURCE] = "SYS_RESOURCE",
+ [CAP_SYS_TIME] = "SYS_TIME",
+ [CAP_SYS_TTY_CONFIG] = "SYS_TTY_CONFIG",
+ [CAP_MKNOD] = "MKNOD",
+ [CAP_LEASE] = "LEASE",
+ [CAP_AUDIT_WRITE] = "AUDIT_WRITE",
+ [CAP_AUDIT_CONTROL] = "AUDIT_CONTROL",
+ [CAP_SETFCAP] = "SETFCAP",
+ [CAP_MAC_OVERRIDE] = "MAC_OVERRIDE",
+ [CAP_MAC_ADMIN] = "MAC_ADMIN",
+ [CAP_SYSLOG] = "SYSLOG",
+ [CAP_WAKE_ALARM] = "WAKE_ALARM",
+ [CAP_BLOCK_SUSPEND] = "BLOCK_SUSPEND",
+ [37 /*CAP_AUDIT_READ*/] = "AUDIT_READ",
+ [38 /*CAP_PERFMON*/] = "PERFMON",
+ [39 /*CAP_BPF*/] = "BPF",
+ [40 /* CAP_CHECKPOINT_RESTORE */] = "CHECKPOINT_RESTORE",
+};
+
+static void netdata_systemd_journal_transform_cap_effective(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ if(scope == FACETS_TRANSFORM_FACET_SORT)
+ return;
+
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ uint64_t cap = strtoul(buffer_tostring(wb), NULL, 16);
+ if(cap) {
+ buffer_fast_strcat(wb, " (", 2);
+ for (size_t i = 0, added = 0; i < sizeof(linux_capabilities) / sizeof(linux_capabilities[0]); i++) {
+ if (linux_capabilities[i] && (cap & (1ULL << i))) {
+
+ if (added)
+ buffer_fast_strcat(wb, " | ", 3);
+
+ buffer_strcat(wb, linux_capabilities[i]);
+ added++;
+ }
+ }
+ buffer_fast_strcat(wb, ")", 1);
}
+ }
+}
- buffer_flush(wb);
- buffer_strcat(wb, sv);
+static void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) {
+ if(scope == FACETS_TRANSFORM_FACET_SORT)
+ return;
+
+ const char *v = buffer_tostring(wb);
+ if(*v && isdigit(*v)) {
+ uint64_t ut = str2ull(buffer_tostring(wb), NULL);
+ if(ut) {
+ time_t timestamp_sec = ut / USEC_PER_SEC;
+ struct tm tm;
+ char buffer[30];
+
+ gmtime_r(&timestamp_sec, &tm);
+ strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm);
+ buffer_sprintf(wb, " (%s.%06llu UTC)", buffer, ut % USEC_PER_SEC);
+ }
}
}
-static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
+// ----------------------------------------------------------------------------
+
+static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) {
FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID");
const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET;
- FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER");
- const char *identifier = syslog_identifier_rkv ? buffer_tostring(syslog_identifier_rkv->wb) : FACET_VALUE_UNSET;
+ const char *identifier = NULL;
+ FACET_ROW_KEY_VALUE *container_name_rkv = dictionary_get(row->dict, "CONTAINER_NAME");
+ if(container_name_rkv && !container_name_rkv->empty)
+ identifier = buffer_tostring(container_name_rkv->wb);
- if(strcmp(identifier, FACET_VALUE_UNSET) == 0) {
- FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM");
- identifier = comm_rkv ? buffer_tostring(comm_rkv->wb) : FACET_VALUE_UNSET;
+ if(!identifier) {
+ FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER");
+ if(syslog_identifier_rkv && !syslog_identifier_rkv->empty)
+ identifier = buffer_tostring(syslog_identifier_rkv->wb);
+
+ if(!identifier) {
+ FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM");
+ if(comm_rkv && !comm_rkv->empty)
+ identifier = buffer_tostring(comm_rkv->wb);
+ }
}
buffer_flush(rkv->wb);
- if(strcmp(pid, FACET_VALUE_UNSET) == 0)
- buffer_strcat(rkv->wb, identifier);
+ if(!identifier)
+ buffer_strcat(rkv->wb, FACET_VALUE_UNSET);
else
buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid);
buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb));
}
-static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) {
- char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS);
+static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) {
+ buffer_json_add_array_item_object(json_array);
+ buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb));
+ buffer_json_object_close(json_array);
+}
+
+DICTIONARY *function_query_status_dict = NULL;
+
+static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) {
+ if(!progress_id || !(*progress_id)) {
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id");
+ netdata_mutex_unlock(&stdout_mutex);
+ return;
+ }
+
+ const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id);
+
+ if(!item) {
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here");
+ netdata_mutex_unlock(&stdout_mutex);
+ return;
+ }
+
+ FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item);
+
+ usec_t now_monotonic_ut = now_monotonic_usec();
+ if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut)
+ fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC;
+
+ usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut;
+
+ size_t files_matched = fqs->files_matched;
+ size_t file_working = fqs->file_working;
+ if(file_working > files_matched)
+ files_matched = file_working;
+
+ size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED);
+ size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED);
- BUFFER *wb = buffer_create(0, NULL);
buffer_flush(wb);
- buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
+ buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
+ buffer_json_member_add_string(wb, "type", "table");
+ buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut);
+ buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched);
+ char msg[1024 + 1];
+ snprintfz(msg, 1024,
+ "Read %zu rows (%0.0f rows/s), "
+ "data %0.1f MB (%0.1f MB/s), "
+ "file %zu of %zu",
+ rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC,
+ (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0,
+ file_working, files_matched
+ );
+ buffer_json_member_add_string(wb, "message", msg);
+ buffer_json_finalize(wb);
+
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb);
+ netdata_mutex_unlock(&stdout_mutex);
- FACETS *facets = facets_create(50, 0, FACETS_OPTION_ALL_KEYS_FTS,
+ dictionary_acquired_item_release(function_query_status_dict, item);
+}
+
+static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
+ fstat_thread_calls = 0;
+ fstat_thread_cached_responses = 0;
+ journal_files_registry_update();
+
+ BUFFER *wb = buffer_create(0, NULL);
+ buffer_flush(wb);
+ buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
+
+ usec_t now_monotonic_ut = now_monotonic_usec();
+ FUNCTION_QUERY_STATUS tmp_fqs = {
+ .cancelled = cancelled,
+ .started_monotonic_ut = now_monotonic_ut,
+ .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC),
+ };
+ FUNCTION_QUERY_STATUS *fqs = NULL;
+ const DICTIONARY_ITEM *fqs_item = NULL;
+
+ FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS,
SYSTEMD_ALWAYS_VISIBLE_KEYS,
SYSTEMD_KEYS_INCLUDED_IN_FACETS,
SYSTEMD_KEYS_EXCLUDED_FROM_FACETS);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_INFO);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_SOURCE);
facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER);
facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE);
facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_DIRECTION);
facets_accepted_param(facets, JOURNAL_PARAMETER_LAST);
facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_ID);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA);
+ facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL);
+
+#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
+ facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE);
+#endif // HAVE_SD_JOURNAL_RESTART_FIELDS
// register the fields in the order you want them on the dashboard
- facets_register_dynamic_key(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS,
- systemd_journal_dynamic_row_id, NULL);
+ facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL);
+
+ facets_register_key_name(facets, "_HOSTNAME",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
+
+ facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS,
+ FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
+ netdata_systemd_journal_dynamic_row_id, NULL);
+
+ facets_register_key_name(facets, "MESSAGE",
+ FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT |
+ FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS);
+
+// facets_register_dynamic_key_name(facets, "MESSAGE",
+// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT |
+// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS,
+// netdata_systemd_journal_rich_message, NULL);
- facets_register_key(facets, "MESSAGE",
- FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_MAIN_TEXT|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS);
+ facets_register_key_name_transformation(facets, "PRIORITY",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_priority, NULL);
- facets_register_key_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
- systemd_journal_transform_priority, NULL);
+ facets_register_key_name_transformation(facets, "SYSLOG_FACILITY",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_syslog_facility, NULL);
- facets_register_key_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
- systemd_journal_transform_syslog_facility, NULL);
+ facets_register_key_name_transformation(facets, "ERRNO",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_errno, NULL);
- facets_register_key(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
- facets_register_key(facets, "UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
- facets_register_key(facets, "USER_UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS);
+ facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE,
+ FACET_KEY_OPTION_NEVER_FACET);
- facets_register_key_transformation(facets, "_UID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
- systemd_journal_transform_uid, uids);
+ facets_register_key_name(facets, "SYSLOG_IDENTIFIER",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
- facets_register_key_transformation(facets, "_GID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS,
- systemd_journal_transform_gid, gids);
+ facets_register_key_name(facets, "UNIT",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
+ facets_register_key_name(facets, "USER_UNIT",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS);
+
+ facets_register_key_name_transformation(facets, "_BOOT_ID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_boot_id, NULL);
+
+ facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_uid, NULL);
+
+ facets_register_key_name_transformation(facets, "_UID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_uid, NULL);
+
+ facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_uid, NULL);
+
+ facets_register_key_name_transformation(facets, "OBJECT_UID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_uid, NULL);
+
+ facets_register_key_name_transformation(facets, "_GID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_gid, NULL);
+
+ facets_register_key_name_transformation(facets, "OBJECT_GID",
+ FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_gid, NULL);
+
+ facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE",
+ FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_cap_effective, NULL);
+
+ facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID",
+ FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_uid, NULL);
+
+ facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID",
+ FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_uid, NULL);
+
+ facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP",
+ FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW,
+ netdata_systemd_journal_transform_timestamp_usec, NULL);
+
+ // ------------------------------------------------------------------------
+ // parse the parameters
+
+ bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false;
time_t after_s = 0, before_s = 0;
usec_t anchor = 0;
+ usec_t if_modified_since = 0;
size_t last = 0;
+ FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION;
const char *query = NULL;
+ const char *chart = NULL;
+ const char *source = NULL;
+ const char *progress_id = NULL;
+ SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL;
+ size_t filters = 0;
- buffer_json_member_add_object(wb, "request");
- buffer_json_member_add_object(wb, "filters");
+ buffer_json_member_add_object(wb, "_request");
+ char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL };
+ size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS);
for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) {
- const char *keyword = get_word(words, num_words, i);
+ char *keyword = get_word(words, num_words, i);
if(!keyword) break;
if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) {
- systemd_journal_function_help(transaction);
+ netdata_systemd_journal_function_help(transaction);
goto cleanup;
}
- else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", strlen(JOURNAL_PARAMETER_AFTER ":")) == 0) {
- after_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_AFTER ":")]);
+ else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) {
+ info = true;
}
- else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", strlen(JOURNAL_PARAMETER_BEFORE ":")) == 0) {
- before_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_BEFORE ":")]);
+ else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) {
+ progress = true;
}
- else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", strlen(JOURNAL_PARAMETER_ANCHOR ":")) == 0) {
- anchor = str2ull(&keyword[strlen(JOURNAL_PARAMETER_ANCHOR ":")], NULL);
+ else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) {
+ char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1];
+
+ if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
+ delta = false;
+ else
+ delta = true;
}
- else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", strlen(JOURNAL_PARAMETER_LAST ":")) == 0) {
- last = str2ul(&keyword[strlen(JOURNAL_PARAMETER_LAST ":")]);
+ else if(strncmp(keyword, JOURNAL_PARAMETER_TAIL ":", sizeof(JOURNAL_PARAMETER_TAIL ":") - 1) == 0) {
+ char *v = &keyword[sizeof(JOURNAL_PARAMETER_TAIL ":") - 1];
+
+ if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
+ tail = false;
+ else
+ tail = true;
}
- else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", strlen(JOURNAL_PARAMETER_QUERY ":")) == 0) {
- query= &keyword[strlen(JOURNAL_PARAMETER_QUERY ":")];
+ else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) {
+ char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1];
+
+ if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
+ data_only = false;
+ else
+ data_only = true;
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_SLICE ":", sizeof(JOURNAL_PARAMETER_SLICE ":") - 1) == 0) {
+ char *v = &keyword[sizeof(JOURNAL_PARAMETER_SLICE ":") - 1];
+
+ if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0)
+ slice = false;
+ else
+ slice = true;
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) {
+ char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1];
+
+ if(*id)
+ progress_id = id;
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
+ source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
+
+ if(strcmp(source, SDJF_SOURCE_ALL_NAME) == 0) {
+ source_type = SDJF_ALL;
+ source = NULL;
+ }
+ else if(strcmp(source, SDJF_SOURCE_LOCAL_NAME) == 0) {
+ source_type = SDJF_LOCAL;
+ source = NULL;
+ }
+ else if(strcmp(source, SDJF_SOURCE_REMOTES_NAME) == 0) {
+ source_type = SDJF_REMOTE;
+ source = NULL;
+ }
+ else if(strcmp(source, SDJF_SOURCE_NAMESPACES_NAME) == 0) {
+ source_type = SDJF_NAMESPACE;
+ source = NULL;
+ }
+ else if(strcmp(source, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) {
+ source_type = SDJF_LOCAL | SDJF_SYSTEM;
+ source = NULL;
+ }
+ else if(strcmp(source, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) {
+ source_type = SDJF_LOCAL | SDJF_USER;
+ source = NULL;
+ }
+ else if(strcmp(source, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) {
+ source_type = SDJF_LOCAL | SDJF_OTHER;
+ source = NULL;
+ }
+ else {
+ source_type = SDJF_ALL;
+ // else, match the source, whatever it is
+ }
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) {
+ after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1) == 0) {
+ before_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1]);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":", sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1) == 0) {
+ if_modified_since = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1], NULL);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1) == 0) {
+ anchor = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1], NULL);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_DIRECTION ":", sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1) == 0) {
+ direction = strcasecmp(&keyword[sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1], "forward") == 0 ? FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD;
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", sizeof(JOURNAL_PARAMETER_LAST ":") - 1) == 0) {
+ last = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_LAST ":") - 1]);
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", sizeof(JOURNAL_PARAMETER_QUERY ":") - 1) == 0) {
+ query= &keyword[sizeof(JOURNAL_PARAMETER_QUERY ":") - 1];
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_HISTOGRAM ":", sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1) == 0) {
+ chart = &keyword[sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1];
+ }
+ else if(strncmp(keyword, JOURNAL_PARAMETER_FACETS ":", sizeof(JOURNAL_PARAMETER_FACETS ":") - 1) == 0) {
+ char *value = &keyword[sizeof(JOURNAL_PARAMETER_FACETS ":") - 1];
+ if(*value) {
+ buffer_json_member_add_array(wb, JOURNAL_PARAMETER_FACETS);
+
+ while(value) {
+ char *sep = strchr(value, ',');
+ if(sep)
+ *sep++ = '\0';
+
+ facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
+ buffer_json_add_array_item_string(wb, value);
+
+ value = sep;
+ }
+
+ buffer_json_array_close(wb); // JOURNAL_PARAMETER_FACETS
+ }
}
else {
char *value = strchr(keyword, ':');
@@ -412,8 +2437,9 @@ static void function_systemd_journal(const char *transaction, char *function, ch
if(sep)
*sep++ = '\0';
- facets_register_facet_filter(facets, keyword, value, FACET_KEY_OPTION_REORDER);
+ facets_register_facet_id_filter(facets, keyword, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER);
buffer_json_add_array_item_string(wb, value);
+ filters++;
value = sep;
}
@@ -423,18 +2449,31 @@ static void function_systemd_journal(const char *transaction, char *function, ch
}
}
- buffer_json_object_close(wb); // filters
+ // ------------------------------------------------------------------------
+ // put this request into the progress db
+
+ if(progress_id && *progress_id) {
+ fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs));
+ fqs = dictionary_acquired_item_value(fqs_item);
+ }
+ else {
+ // no progress id given, proceed without registering our progress in the dictionary
+ fqs = &tmp_fqs;
+ fqs_item = NULL;
+ }
+
+ // ------------------------------------------------------------------------
+ // validate parameters
- time_t expires = now_realtime_sec() + 1;
- time_t now_s;
+ time_t now_s = now_realtime_sec();
+ time_t expires = now_s + 1;
if(!after_s && !before_s) {
- now_s = now_realtime_sec();
before_s = now_s;
after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION;
}
else
- rrdr_relative_window_to_absolute(&after_s, &before_s, &now_s, false);
+ rrdr_relative_window_to_absolute(&after_s, &before_s, now_s);
if(after_s > before_s) {
time_t tmp = after_s;
@@ -448,85 +2487,179 @@ static void function_systemd_journal(const char *transaction, char *function, ch
if(!last)
last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY;
- buffer_json_member_add_time_t(wb, "after", after_s);
- buffer_json_member_add_time_t(wb, "before", before_s);
- buffer_json_member_add_uint64(wb, "anchor", anchor);
- buffer_json_member_add_uint64(wb, "last", last);
- buffer_json_member_add_string(wb, "query", query);
- buffer_json_member_add_time_t(wb, "timeout", timeout);
- buffer_json_object_close(wb); // request
-
- facets_set_items(facets, last);
- facets_set_anchor(facets, anchor);
- facets_set_query(facets, query);
- int response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC,
- now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC);
- if(response != HTTP_RESP_OK) {
- pluginsd_function_json_error(transaction, response, "failed");
- goto cleanup;
+ // ------------------------------------------------------------------------
+ // set query time-frame, anchors and direction
+
+ fqs->after_ut = after_s * USEC_PER_SEC;
+ fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1;
+ fqs->if_modified_since = if_modified_since;
+ fqs->data_only = data_only;
+ fqs->delta = (fqs->data_only) ? delta : false;
+ fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false;
+ fqs->source = string_strdupz(source);
+ fqs->source_type = source_type;
+ fqs->entries = last;
+ fqs->last_modified = 0;
+ fqs->filters = filters;
+ fqs->query = (query && *query) ? query : NULL;
+ fqs->histogram = (chart && *chart) ? chart : NULL;
+ fqs->direction = direction;
+ fqs->anchor.start_ut = anchor;
+ fqs->anchor.stop_ut = 0;
+
+ if(fqs->anchor.start_ut && fqs->tail) {
+ // a tail request
+ // we need the top X entries from BEFORE
+ // but, we need to calculate the facets and the
+ // histogram up to the anchor
+ fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
+ fqs->anchor.start_ut = 0;
+ fqs->anchor.stop_ut = anchor;
}
- pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires);
- fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout);
+ if(anchor && anchor < fqs->after_ut) {
+ log_fqs(fqs, "received anchor is too small for query timeframe, ignoring anchor");
+ anchor = 0;
+ fqs->anchor.start_ut = 0;
+ fqs->anchor.stop_ut = 0;
+ fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
+ }
+ else if(anchor > fqs->before_ut) {
+ log_fqs(fqs, "received anchor is too big for query timeframe, ignoring anchor");
+ anchor = 0;
+ fqs->anchor.start_ut = 0;
+ fqs->anchor.stop_ut = 0;
+ fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD;
+ }
- pluginsd_function_result_end_to_stdout();
+ facets_set_anchor(facets, fqs->anchor.start_ut, fqs->anchor.stop_ut, fqs->direction);
-cleanup:
- facets_destroy(facets);
- buffer_free(wb);
-}
+ facets_set_additional_options(facets,
+ ((fqs->data_only) ? FACETS_OPTION_DATA_ONLY : 0) |
+ ((fqs->delta) ? FACETS_OPTION_SHOW_DELTAS : 0));
-static void *reader_main(void *arg __maybe_unused) {
- char buffer[PLUGINSD_LINE_MAX + 1];
+ // ------------------------------------------------------------------------
+ // set the rest of the query parameters
- char *s = NULL;
- while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
- char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS);
+ facets_set_items(facets, fqs->entries);
+ facets_set_query(facets, fqs->query);
- const char *keyword = get_word(words, num_words, 0);
+#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS
+ fqs->slice = slice;
+ if(slice)
+ facets_enable_slice_mode(facets);
+#else
+ fqs->slice = false;
+#endif
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ if(fqs->histogram)
+ facets_set_timeframe_and_histogram_by_id(facets, fqs->histogram, fqs->after_ut, fqs->before_ut);
+ else
+ facets_set_timeframe_and_histogram_by_name(facets, "PRIORITY", fqs->after_ut, fqs->before_ut);
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- keyword,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
- if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT;
- netdata_mutex_lock(&mutex);
+ // ------------------------------------------------------------------------
+ // complete the request object
+
+ buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false);
+ buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice);
+ buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only);
+ buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false);
+ buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta);
+ buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail);
+ buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id);
+ buffer_json_member_add_string(wb, JOURNAL_PARAMETER_SOURCE, string2str(fqs->source));
+ buffer_json_member_add_uint64(wb, "source_type", fqs->source_type);
+ buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC);
+ buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC);
+ buffer_json_member_add_uint64(wb, "if_modified_since", fqs->if_modified_since);
+ buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_ANCHOR, anchor);
+ buffer_json_member_add_string(wb, JOURNAL_PARAMETER_DIRECTION, fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward");
+ buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_LAST, fqs->entries);
+ buffer_json_member_add_string(wb, JOURNAL_PARAMETER_QUERY, fqs->query);
+ buffer_json_member_add_string(wb, JOURNAL_PARAMETER_HISTOGRAM, fqs->histogram);
+ buffer_json_object_close(wb); // request
- if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0)
- function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout);
- else
- pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in systemd-journal.plugin.");
+ buffer_json_journal_versions(wb);
- fflush(stdout);
- netdata_mutex_unlock(&mutex);
+ // ------------------------------------------------------------------------
+ // run the request
+
+ int response;
+
+ if(info) {
+ facets_accepted_parameters_to_json_array(facets, wb, false);
+ buffer_json_member_add_array(wb, "required_params");
+ {
+ buffer_json_add_array_item_object(wb);
+ {
+ buffer_json_member_add_string(wb, "id", "source");
+ buffer_json_member_add_string(wb, "name", "source");
+ buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query");
+ buffer_json_member_add_string(wb, "type", "select");
+ buffer_json_member_add_array(wb, "options");
+ {
+ available_journal_file_sources_to_json_array(wb);
+ }
+ buffer_json_array_close(wb); // options array
}
+ buffer_json_object_close(wb); // required params object
}
- else
- netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
+ buffer_json_array_close(wb); // required_params array
+
+ facets_table_config(wb);
+
+ buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
+ buffer_json_member_add_string(wb, "type", "table");
+ buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
+ buffer_json_finalize(wb);
+ response = HTTP_RESP_OK;
+ goto output;
}
- if(!s || feof(stdin) || ferror(stdin)) {
- plugin_should_exit = true;
- netdata_log_error("Received error on stdin.");
+ if(progress) {
+ function_systemd_journal_progress(wb, transaction, progress_id);
+ goto cleanup;
}
- exit(1);
+ response = netdata_systemd_journal_query(wb, facets, fqs);
+
+ // ------------------------------------------------------------------------
+ // cleanup query params
+
+ string_freez(fqs->source);
+ fqs->source = NULL;
+
+ // ------------------------------------------------------------------------
+ // handle error response
+
+ if(response != HTTP_RESP_OK) {
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_json_error_to_stdout(transaction, response, "failed");
+ netdata_mutex_unlock(&stdout_mutex);
+ goto cleanup;
+ }
+
+output:
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb);
+ netdata_mutex_unlock(&stdout_mutex);
+
+cleanup:
+ facets_destroy(facets);
+ buffer_free(wb);
+
+ if(fqs_item) {
+ dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item));
+ dictionary_acquired_item_release(function_query_status_dict, fqs_item);
+ dictionary_garbage_collect(function_query_status_dict);
+ }
}
+// ----------------------------------------------------------------------------
+
int main(int argc __maybe_unused, char **argv __maybe_unused) {
stderror = stderr;
clocks_init();
@@ -540,44 +2673,104 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
error_log_errors_per_period = 100;
error_log_throttle_period = 3600;
- // initialize the threads
- netdata_threads_init_for_external_plugins(0); // set the default threads stack size here
+ log_set_global_severity_for_external_plugins();
+
+ netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX");
+ if(verify_netdata_host_prefix() == -1) exit(1);
+
+ // ------------------------------------------------------------------------
+ // setup the journal directories
+
+ unsigned d = 0;
+
+ journal_directories[d++].path = strdupz("/var/log/journal");
+ journal_directories[d++].path = strdupz("/run/log/journal");
+
+ if(*netdata_configured_host_prefix) {
+ char path[PATH_MAX];
+ snprintfz(path, sizeof(path), "%s/var/log/journal", netdata_configured_host_prefix);
+ journal_directories[d++].path = strdupz(path);
+ snprintfz(path, sizeof(path), "%s/run/log/journal", netdata_configured_host_prefix);
+ journal_directories[d++].path = strdupz(path);
+ }
+
+ // terminate the list
+ journal_directories[d].path = NULL;
+
+ // ------------------------------------------------------------------------
+
+ function_query_status_dict = dictionary_create_advanced(
+ DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(FUNCTION_QUERY_STATUS));
+
+ // ------------------------------------------------------------------------
+ // initialize the used hashes files registry
+
+ used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+
+
+ // ------------------------------------------------------------------------
+ // initialize the journal files registry
+
+ systemd_journal_session = (now_realtime_usec() / USEC_PER_SEC) * USEC_PER_SEC;
+
+ journal_files_registry = dictionary_create_advanced(
+ DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(struct journal_file));
+
+ dictionary_register_insert_callback(journal_files_registry, files_registry_insert_cb, NULL);
+ dictionary_register_delete_callback(journal_files_registry, files_registry_delete_cb, NULL);
+ dictionary_register_conflict_callback(journal_files_registry, files_registry_conflict_cb, NULL);
+
+ boot_ids_to_first_ut = dictionary_create_advanced(
+ DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
+ NULL, sizeof(usec_t));
+
+ journal_files_registry_update();
- uids = dictionary_create(0);
- gids = dictionary_create(0);
// ------------------------------------------------------------------------
// debug
if(argc == 2 && strcmp(argv[1], "debug") == 0) {
- char buf[] = "systemd-journal after:-86400 before:0 last:500";
- function_systemd_journal("123", buf, "", 0, 30);
+ bool cancelled = false;
+ char buf[] = "systemd-journal after:-16000000 before:0 last:1";
+ // char buf[] = "systemd-journal after:1695332964 before:1695937764 direction:backward last:100 slice:true source:all DHKucpqUoe1:PtVoyIuX.MU";
+ // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403";
+ function_systemd_journal("123", buf, 600, &cancelled);
exit(1);
}
// ------------------------------------------------------------------------
+ // the event loop for functions
+
+ struct functions_evloop_globals *wg =
+ functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit);
+
+ functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal,
+ SYSTEMD_JOURNAL_DEFAULT_TIMEOUT);
- netdata_thread_t reader_thread;
- netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL);
// ------------------------------------------------------------------------
time_t started_t = now_monotonic_sec();
- size_t iteration;
+ size_t iteration = 0;
usec_t step = 1000 * USEC_PER_MS;
bool tty = isatty(fileno(stderr)) == 1;
- netdata_mutex_lock(&mutex);
+ netdata_mutex_lock(&stdout_mutex);
fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n",
SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION);
heartbeat_t hb;
heartbeat_init(&hb);
- for(iteration = 0; 1 ; iteration++) {
- netdata_mutex_unlock(&mutex);
+ while(!plugin_should_exit) {
+ iteration++;
+
+ netdata_mutex_unlock(&stdout_mutex);
heartbeat_next(&hb, step);
- netdata_mutex_lock(&mutex);
+ netdata_mutex_lock(&stdout_mutex);
if(!tty)
fprintf(stdout, "\n");
@@ -589,8 +2782,5 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) {
break;
}
- dictionary_destroy(uids);
- dictionary_destroy(gids);
-
exit(0);
}