diff options
Diffstat (limited to 'collectors/systemd-journal.plugin/systemd-journal.c')
-rw-r--r-- | collectors/systemd-journal.plugin/systemd-journal.c | 2664 |
1 files changed, 2427 insertions, 237 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/collectors/systemd-journal.plugin/systemd-journal.c index 304ff244a..c2bd98e7d 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/collectors/systemd-journal.plugin/systemd-journal.c @@ -5,29 +5,110 @@ * GPL v3+ */ -// TODO - 1) MARKDOC - #include "collectors/all.h" #include "libnetdata/libnetdata.h" #include "libnetdata/required_dummies.h" -#ifndef SD_JOURNAL_ALL_NAMESPACES -#define JOURNAL_NAMESPACE SD_JOURNAL_LOCAL_ONLY -#else -#define JOURNAL_NAMESPACE SD_JOURNAL_ALL_NAMESPACES -#endif - +#include <linux/capability.h> #include <systemd/sd-journal.h> #include <syslog.h> +/* + * TODO + * + * _UDEV_DEVLINK is frequently set more than once per field - support multi-value faces + * + */ + + +// ---------------------------------------------------------------------------- +// fstat64 overloading to speed up libsystemd +// https://github.com/systemd/systemd/pull/29261 + +#define ND_SD_JOURNAL_OPEN_FLAGS (0) + +#include <dlfcn.h> +#include <sys/stat.h> + +#define FSTAT_CACHE_MAX 1024 +struct fdstat64_cache_entry { + bool enabled; + bool updated; + int err_no; + struct stat64 stat; + int ret; + size_t cached_count; + size_t session; +}; + +struct fdstat64_cache_entry fstat64_cache[FSTAT_CACHE_MAX] = {0 }; +static __thread size_t fstat_thread_calls = 0; +static __thread size_t fstat_thread_cached_responses = 0; +static __thread bool enable_thread_fstat = false; +static __thread size_t fstat_caching_thread_session = 0; +static size_t fstat_caching_global_session = 0; + +static void fstat_cache_enable_on_thread(void) { + fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_ACQUIRE); + enable_thread_fstat = true; +} + +static void fstat_cache_disable_on_thread(void) { + fstat_caching_thread_session = __atomic_add_fetch(&fstat_caching_global_session, 1, __ATOMIC_RELEASE); + enable_thread_fstat = false; +} + +int fstat64(int fd, struct stat64 *buf) { + static int (*real_fstat)(int, struct stat64 *) = NULL; + if (!real_fstat) + real_fstat = dlsym(RTLD_NEXT, "fstat64"); + + fstat_thread_calls++; + + if(fd >= 0 && fd < FSTAT_CACHE_MAX) { + if(enable_thread_fstat && fstat64_cache[fd].session != fstat_caching_thread_session) { + fstat64_cache[fd].session = fstat_caching_thread_session; + fstat64_cache[fd].enabled = true; + fstat64_cache[fd].updated = false; + } + + if(fstat64_cache[fd].enabled && fstat64_cache[fd].updated && fstat64_cache[fd].session == fstat_caching_thread_session) { + fstat_thread_cached_responses++; + errno = fstat64_cache[fd].err_no; + *buf = fstat64_cache[fd].stat; + fstat64_cache[fd].cached_count++; + return fstat64_cache[fd].ret; + } + } + + int ret = real_fstat(fd, buf); + + if(fd >= 0 && fd < FSTAT_CACHE_MAX && fstat64_cache[fd].enabled) { + fstat64_cache[fd].ret = ret; + fstat64_cache[fd].updated = true; + fstat64_cache[fd].err_no = errno; + fstat64_cache[fd].stat = *buf; + fstat64_cache[fd].session = fstat_caching_thread_session; + } + + return ret; +} + +// ---------------------------------------------------------------------------- + #define FACET_MAX_VALUE_LENGTH 8192 +#define SYSTEMD_JOURNAL_MAX_SOURCE_LEN 64 #define SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION "View, search and analyze systemd journal entries." #define SYSTEMD_JOURNAL_FUNCTION_NAME "systemd-journal" -#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 30 +#define SYSTEMD_JOURNAL_DEFAULT_TIMEOUT 60 #define SYSTEMD_JOURNAL_MAX_PARAMS 100 -#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (3 * 3600) +#define SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION (1 * 3600) #define SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY 200 +#define SYSTEMD_JOURNAL_WORKER_THREADS 5 + +#define JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT (5 * USEC_PER_SEC) // assume always 5 seconds latency +#define JOURNAL_VS_REALTIME_DELTA_MAX_UT (2 * 60 * USEC_PER_SEC) // up to 2 minutes latency #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" @@ -35,147 +116,1540 @@ #define JOURNAL_PARAMETER_ANCHOR "anchor" #define JOURNAL_PARAMETER_LAST "last" #define JOURNAL_PARAMETER_QUERY "query" +#define JOURNAL_PARAMETER_FACETS "facets" +#define JOURNAL_PARAMETER_HISTOGRAM "histogram" +#define JOURNAL_PARAMETER_DIRECTION "direction" +#define JOURNAL_PARAMETER_IF_MODIFIED_SINCE "if_modified_since" +#define JOURNAL_PARAMETER_DATA_ONLY "data_only" +#define JOURNAL_PARAMETER_SOURCE "source" +#define JOURNAL_PARAMETER_INFO "info" +#define JOURNAL_PARAMETER_ID "id" +#define JOURNAL_PARAMETER_PROGRESS "progress" +#define JOURNAL_PARAMETER_SLICE "slice" +#define JOURNAL_PARAMETER_DELTA "delta" +#define JOURNAL_PARAMETER_TAIL "tail" + +#define JOURNAL_KEY_ND_JOURNAL_FILE "ND_JOURNAL_FILE" +#define JOURNAL_KEY_ND_JOURNAL_PROCESS "ND_JOURNAL_PROCESS" + +#define JOURNAL_DEFAULT_SLICE_MODE true +#define JOURNAL_DEFAULT_DIRECTION FACETS_ANCHOR_DIRECTION_BACKWARD #define SYSTEMD_ALWAYS_VISIBLE_KEYS NULL -#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS NULL + +#define SYSTEMD_KEYS_EXCLUDED_FROM_FACETS \ + "*MESSAGE*" \ + "|*_RAW" \ + "|*_USEC" \ + "|*_NSEC" \ + "|*TIMESTAMP*" \ + "|*_ID" \ + "|*_ID_*" \ + "|__*" \ + "" + #define SYSTEMD_KEYS_INCLUDED_IN_FACETS \ - "_TRANSPORT" \ - "|SYSLOG_IDENTIFIER" \ - "|SYSLOG_FACILITY" \ + \ + /* --- USER JOURNAL FIELDS --- */ \ + \ + /* "|MESSAGE" */ \ + /* "|MESSAGE_ID" */ \ "|PRIORITY" \ - "|_HOSTNAME" \ - "|_RUNTIME_SCOPE" \ - "|_PID" \ + "|CODE_FILE" \ + /* "|CODE_LINE" */ \ + "|CODE_FUNC" \ + "|ERRNO" \ + /* "|INVOCATION_ID" */ \ + /* "|USER_INVOCATION_ID" */ \ + "|SYSLOG_FACILITY" \ + "|SYSLOG_IDENTIFIER" \ + /* "|SYSLOG_PID" */ \ + /* "|SYSLOG_TIMESTAMP" */ \ + /* "|SYSLOG_RAW" */ \ + /* "!DOCUMENTATION" */ \ + /* "|TID" */ \ + "|UNIT" \ + "|USER_UNIT" \ + "|UNIT_RESULT" /* undocumented */ \ + \ + \ + /* --- TRUSTED JOURNAL FIELDS --- */ \ + \ + /* "|_PID" */ \ "|_UID" \ "|_GID" \ - "|_SYSTEMD_UNIT" \ - "|_SYSTEMD_SLICE" \ - "|_SYSTEMD_USER_SLICE" \ "|_COMM" \ "|_EXE" \ + /* "|_CMDLINE" */ \ + "|_CAP_EFFECTIVE" \ + /* "|_AUDIT_SESSION" */ \ + "|_AUDIT_LOGINUID" \ "|_SYSTEMD_CGROUP" \ + "|_SYSTEMD_SLICE" \ + "|_SYSTEMD_UNIT" \ "|_SYSTEMD_USER_UNIT" \ - "|USER_UNIT" \ - "|UNIT" \ + "|_SYSTEMD_USER_SLICE" \ + "|_SYSTEMD_SESSION" \ + "|_SYSTEMD_OWNER_UID" \ + "|_SELINUX_CONTEXT" \ + /* "|_SOURCE_REALTIME_TIMESTAMP" */ \ + "|_BOOT_ID" \ + "|_MACHINE_ID" \ + /* "|_SYSTEMD_INVOCATION_ID" */ \ + "|_HOSTNAME" \ + "|_TRANSPORT" \ + "|_STREAM_ID" \ + /* "|LINE_BREAK" */ \ + "|_NAMESPACE" \ + "|_RUNTIME_SCOPE" \ + \ + \ + /* --- KERNEL JOURNAL FIELDS --- */ \ + \ + /* "|_KERNEL_DEVICE" */ \ + "|_KERNEL_SUBSYSTEM" \ + /* "|_UDEV_SYSNAME" */ \ + "|_UDEV_DEVNODE" \ + /* "|_UDEV_DEVLINK" */ \ + \ + \ + /* --- LOGGING ON BEHALF --- */ \ + \ + "|OBJECT_UID" \ + "|OBJECT_GID" \ + "|OBJECT_COMM" \ + "|OBJECT_EXE" \ + /* "|OBJECT_CMDLINE" */ \ + /* "|OBJECT_AUDIT_SESSION" */ \ + "|OBJECT_AUDIT_LOGINUID" \ + "|OBJECT_SYSTEMD_CGROUP" \ + "|OBJECT_SYSTEMD_SESSION" \ + "|OBJECT_SYSTEMD_OWNER_UID" \ + "|OBJECT_SYSTEMD_UNIT" \ + "|OBJECT_SYSTEMD_USER_UNIT" \ + \ + \ + /* --- CORE DUMPS --- */ \ + \ + "|COREDUMP_COMM" \ + "|COREDUMP_UNIT" \ + "|COREDUMP_USER_UNIT" \ + "|COREDUMP_SIGNAL_NAME" \ + "|COREDUMP_CGROUP" \ + \ + \ + /* --- DOCKER --- */ \ + \ + "|CONTAINER_ID" \ + /* "|CONTAINER_ID_FULL" */ \ + "|CONTAINER_NAME" \ + "|CONTAINER_TAG" \ + "|IMAGE_NAME" /* undocumented */ \ + /* "|CONTAINER_PARTIAL_MESSAGE" */ \ + \ "" -static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER; +static netdata_mutex_t stdout_mutex = NETDATA_MUTEX_INITIALIZER; static bool plugin_should_exit = false; -DICTIONARY *uids = NULL; -DICTIONARY *gids = NULL; +// ---------------------------------------------------------------------------- +typedef enum { + ND_SD_JOURNAL_NO_FILE_MATCHED, + ND_SD_JOURNAL_FAILED_TO_OPEN, + ND_SD_JOURNAL_FAILED_TO_SEEK, + ND_SD_JOURNAL_TIMED_OUT, + ND_SD_JOURNAL_OK, + ND_SD_JOURNAL_NOT_MODIFIED, + ND_SD_JOURNAL_CANCELLED, +} ND_SD_JOURNAL_STATUS; + +typedef enum { + SDJF_ALL = 0, + SDJF_LOCAL = (1 << 0), + SDJF_REMOTE = (1 << 1), + SDJF_SYSTEM = (1 << 2), + SDJF_USER = (1 << 3), + SDJF_NAMESPACE = (1 << 4), + SDJF_OTHER = (1 << 5), +} SD_JOURNAL_FILE_SOURCE_TYPE; + +typedef struct function_query_status { + bool *cancelled; // a pointer to the cancelling boolean + usec_t stop_monotonic_ut; + + usec_t started_monotonic_ut; + + // request + SD_JOURNAL_FILE_SOURCE_TYPE source_type; + STRING *source; + usec_t after_ut; + usec_t before_ut; + + struct { + usec_t start_ut; + usec_t stop_ut; + } anchor; + + FACETS_ANCHOR_DIRECTION direction; + size_t entries; + usec_t if_modified_since; + bool delta; + bool tail; + bool data_only; + bool slice; + size_t filters; + usec_t last_modified; + const char *query; + const char *histogram; + + // per file progress info + size_t cached_count; + + // progress statistics + usec_t matches_setup_ut; + size_t rows_useful; + size_t rows_read; + size_t bytes_read; + size_t files_matched; + size_t file_working; +} FUNCTION_QUERY_STATUS; + +struct journal_file { + const char *filename; + size_t filename_len; + STRING *source; + SD_JOURNAL_FILE_SOURCE_TYPE source_type; + usec_t file_last_modified_ut; + usec_t msg_first_ut; + usec_t msg_last_ut; + usec_t last_scan_ut; + size_t size; + bool logged_failure; + usec_t max_journal_vs_realtime_delta_ut; +}; + +static void log_fqs(FUNCTION_QUERY_STATUS *fqs, const char *msg) { + netdata_log_error("ERROR: %s, on query " + "timeframe [%"PRIu64" - %"PRIu64"], " + "anchor [%"PRIu64" - %"PRIu64"], " + "if_modified_since %"PRIu64", " + "data_only:%s, delta:%s, tail:%s, direction:%s" + , msg + , fqs->after_ut, fqs->before_ut + , fqs->anchor.start_ut, fqs->anchor.stop_ut + , fqs->if_modified_since + , fqs->data_only ? "true" : "false" + , fqs->delta ? "true" : "false" + , fqs->tail ? "tail" : "false" + , fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward"); +} -// ---------------------------------------------------------------------------- +static inline bool netdata_systemd_journal_seek_to(sd_journal *j, usec_t timestamp) { + if(sd_journal_seek_realtime_usec(j, timestamp) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to %" PRIu64, timestamp); + if(sd_journal_seek_tail(j) < 0) { + netdata_log_error("SYSTEMD-JOURNAL: Failed to seek to journal's tail"); + return false; + } + } + + return true; +} + +#define JD_SOURCE_REALTIME_TIMESTAMP "_SOURCE_REALTIME_TIMESTAMP" -int systemd_journal_query(BUFFER *wb, FACETS *facets, usec_t after_ut, usec_t before_ut, usec_t stop_monotonic_ut) { - sd_journal *j; - int r; +static inline bool parse_journal_field(const char *data, size_t data_length, const char **key, size_t *key_length, const char **value, size_t *value_length) { + const char *k = data; + const char *equal = strchr(k, '='); + if(unlikely(!equal)) + return false; - // Open the system journal for reading - r = sd_journal_open(&j, JOURNAL_NAMESPACE); - if (r < 0) - return HTTP_RESP_INTERNAL_SERVER_ERROR; + size_t kl = equal - k; + + const char *v = ++equal; + size_t vl = data_length - kl - 1; + + *key = k; + *key_length = kl; + *value = v; + *value_length = vl; + + return true; +} + +static inline size_t netdata_systemd_journal_process_row(sd_journal *j, FACETS *facets, struct journal_file *jf, usec_t *msg_ut) { + const void *data; + size_t length, bytes = 0; + + facets_add_key_value_length(facets, JOURNAL_KEY_ND_JOURNAL_FILE, sizeof(JOURNAL_KEY_ND_JOURNAL_FILE) - 1, jf->filename, jf->filename_len); + + SD_JOURNAL_FOREACH_DATA(j, data, length) { + const char *key, *value; + size_t key_length, value_length; + + if(!parse_journal_field(data, length, &key, &key_length, &value, &value_length)) + continue; + +#ifdef NETDATA_INTERNAL_CHECKS + usec_t origin_journal_ut = *msg_ut; +#endif + if(unlikely(key_length == sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1 && + memcmp(key, JD_SOURCE_REALTIME_TIMESTAMP, sizeof(JD_SOURCE_REALTIME_TIMESTAMP) - 1) == 0)) { + usec_t ut = str2ull(value, NULL); + if(ut && ut < *msg_ut) { + usec_t delta = *msg_ut - ut; + *msg_ut = ut; + + if(delta > JOURNAL_VS_REALTIME_DELTA_MAX_UT) + delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT; + + // update max_journal_vs_realtime_delta_ut if the delta increased + usec_t expected = jf->max_journal_vs_realtime_delta_ut; + do { + if(delta <= expected) + break; + } while(!__atomic_compare_exchange_n(&jf->max_journal_vs_realtime_delta_ut, &expected, delta, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + internal_error(delta > expected, + "increased max_journal_vs_realtime_delta_ut from %"PRIu64" to %"PRIu64", " + "journal %"PRIu64", actual %"PRIu64" (delta %"PRIu64")" + , expected, delta, origin_journal_ut, *msg_ut, origin_journal_ut - (*msg_ut)); + } + } + + bytes += length; + facets_add_key_value_length(facets, key, key_length, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); + } + + return bytes; +} + +#define FUNCTION_PROGRESS_UPDATE_ROWS(rows_read, rows) __atomic_fetch_add(&(rows_read), rows, __ATOMIC_RELAXED) +#define FUNCTION_PROGRESS_UPDATE_BYTES(bytes_read, bytes) __atomic_fetch_add(&(bytes_read), bytes, __ATOMIC_RELAXED) +#define FUNCTION_PROGRESS_EVERY_ROWS (1ULL << 13) +#define FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS (1ULL << 7) + +static inline ND_SD_JOURNAL_STATUS check_stop(const bool *cancelled, const usec_t *stop_monotonic_ut) { + if(cancelled && __atomic_load_n(cancelled, __ATOMIC_RELAXED)) { + internal_error(true, "Function has been cancelled"); + return ND_SD_JOURNAL_CANCELLED; + } + + if(now_monotonic_usec() > __atomic_load_n(stop_monotonic_ut, __ATOMIC_RELAXED)) { + internal_error(true, "Function timed out"); + return ND_SD_JOURNAL_TIMED_OUT; + } + + return ND_SD_JOURNAL_OK; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED); + + usec_t start_ut = ((fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->before_ut) + anchor_delta; + usec_t stop_ut = (fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->after_ut; + bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut); + + if(!netdata_systemd_journal_seek_to(j, start_ut)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + usec_t earliest_msg_ut = 0; + size_t row_counter = 0, last_row_counter = 0, rows_useful = 0; + size_t bytes = 0, last_bytes = 0; + + usec_t last_usec_from = 0; + usec_t last_usec_to = 0; + + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; facets_rows_begin(facets); + while (status == ND_SD_JOURNAL_OK && sd_journal_previous(j) > 0) { + usec_t msg_ut = 0; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) { + errors_no_timestamp++; + continue; + } + + if(unlikely(msg_ut > earliest_msg_ut)) + earliest_msg_ut = msg_ut; - bool timed_out = false; - size_t row_counter = 0; - sd_journal_seek_realtime_usec(j, before_ut); - SD_JOURNAL_FOREACH_BACKWARDS(j) { - row_counter++; + if (unlikely(msg_ut > start_ut)) + continue; + + if (unlikely(msg_ut < stop_ut)) + break; - uint64_t msg_ut; - sd_journal_get_realtime_usec(j, &msg_ut); - if (msg_ut < after_ut) + bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + + // make sure each line gets a unique timestamp + if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) + msg_ut = --last_usec_from; + else + last_usec_from = last_usec_to = msg_ut; + + if(facets_row_finished(facets, msg_ut)) + rows_useful++; + + row_counter++; + if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && + stop_when_full && + facets_rows(facets) >= fqs->entries)) { + // stop the data only query + usec_t oldest = facets_row_oldest_ut(facets); + if(oldest && msg_ut < (oldest - anchor_delta)) break; + } - const void *data; - size_t length; - SD_JOURNAL_FOREACH_DATA(j, data, length) { - const char *key = data; - const char *equal = strchr(key, '='); - if(unlikely(!equal)) - continue; + if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + last_row_counter = row_counter; - const char *value = ++equal; - size_t key_length = value - key; // including '\0' + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + last_bytes = bytes; - char key_copy[key_length]; - memcpy(key_copy, key, key_length - 1); - key_copy[key_length - 1] = '\0'; + status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + } + } - size_t value_length = length - key_length; // without '\0' - facets_add_key_value_length(facets, key_copy, value, value_length <= FACET_MAX_VALUE_LENGTH ? value_length : FACET_MAX_VALUE_LENGTH); - } + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + + fqs->rows_useful += rows_useful; + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(earliest_msg_ut > fqs->last_modified) + fqs->last_modified = earliest_msg_ut; + + return status; +} + +ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward( + sd_journal *j, BUFFER *wb __maybe_unused, FACETS *facets, + struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + usec_t anchor_delta = __atomic_load_n(&jf->max_journal_vs_realtime_delta_ut, __ATOMIC_RELAXED); + + usec_t start_ut = (fqs->data_only && fqs->anchor.start_ut) ? fqs->anchor.start_ut : fqs->after_ut; + usec_t stop_ut = ((fqs->data_only && fqs->anchor.stop_ut) ? fqs->anchor.stop_ut : fqs->before_ut) + anchor_delta; + bool stop_when_full = (fqs->data_only && !fqs->anchor.stop_ut); + + if(!netdata_systemd_journal_seek_to(j, start_ut)) + return ND_SD_JOURNAL_FAILED_TO_SEEK; + + size_t errors_no_timestamp = 0; + usec_t earliest_msg_ut = 0; + size_t row_counter = 0, last_row_counter = 0, rows_useful = 0; + size_t bytes = 0, last_bytes = 0; + + usec_t last_usec_from = 0; + usec_t last_usec_to = 0; + + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_OK; + + facets_rows_begin(facets); + while (status == ND_SD_JOURNAL_OK && sd_journal_next(j) > 0) { + usec_t msg_ut = 0; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0 || !msg_ut) { + errors_no_timestamp++; + continue; + } + + if(likely(msg_ut > earliest_msg_ut)) + earliest_msg_ut = msg_ut; - facets_row_finished(facets, msg_ut); + if (unlikely(msg_ut < start_ut)) + continue; - if((row_counter % 100) == 0 && now_monotonic_usec() > stop_monotonic_ut) { - timed_out = true; + if (unlikely(msg_ut > stop_ut)) + break; + + bytes += netdata_systemd_journal_process_row(j, facets, jf, &msg_ut); + + // make sure each line gets a unique timestamp + if(unlikely(msg_ut >= last_usec_from && msg_ut <= last_usec_to)) + msg_ut = ++last_usec_to; + else + last_usec_from = last_usec_to = msg_ut; + + if(facets_row_finished(facets, msg_ut)) + rows_useful++; + + row_counter++; + if(unlikely((row_counter % FUNCTION_DATA_ONLY_CHECK_EVERY_ROWS) == 0 && + stop_when_full && + facets_rows(facets) >= fqs->entries)) { + // stop the data only query + usec_t newest = facets_row_newest_ut(facets); + if(newest && msg_ut > (newest + anchor_delta)) break; + } + + if(unlikely(row_counter % FUNCTION_PROGRESS_EVERY_ROWS == 0)) { + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + last_row_counter = row_counter; + + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + last_bytes = bytes; + + status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + } + } + + FUNCTION_PROGRESS_UPDATE_ROWS(fqs->rows_read, row_counter - last_row_counter); + FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); + + fqs->rows_useful += rows_useful; + + if(errors_no_timestamp) + netdata_log_error("SYSTEMD-JOURNAL: %zu lines did not have timestamps", errors_no_timestamp); + + if(earliest_msg_ut > fqs->last_modified) + fqs->last_modified = earliest_msg_ut; + + return status; +} + +bool netdata_systemd_journal_check_if_modified_since(sd_journal *j, usec_t seek_to, usec_t last_modified) { + // return true, if data have been modified since the timestamp + + if(!last_modified || !seek_to) + return false; + + if(!netdata_systemd_journal_seek_to(j, seek_to)) + return false; + + usec_t first_msg_ut = 0; + while (sd_journal_previous(j) > 0) { + usec_t msg_ut; + if(sd_journal_get_realtime_usec(j, &msg_ut) < 0) + continue; + + first_msg_ut = msg_ut; + break; + } + + return first_msg_ut != last_modified; +} + +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS +static bool netdata_systemd_filtering_by_journal(sd_journal *j, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) { + const char *field = NULL; + const void *data = NULL; + size_t data_length; + size_t added_keys = 0; + size_t failures = 0; + size_t filters_added = 0; + + SD_JOURNAL_FOREACH_FIELD(j, field) { + bool interesting; + + if(fqs->data_only) + interesting = facets_key_name_is_filter(facets, field); + else + interesting = facets_key_name_is_facet(facets, field); + + if(interesting) { + if(sd_journal_query_unique(j, field) >= 0) { + bool added_this_key = false; + size_t added_values = 0; + + SD_JOURNAL_FOREACH_UNIQUE(j, data, data_length) { + const char *key, *value; + size_t key_length, value_length; + + if(!parse_journal_field(data, data_length, &key, &key_length, &value, &value_length)) + continue; + + facets_add_possible_value_name_to_key(facets, key, key_length, value, value_length); + + if(!facets_key_name_value_length_is_selected(facets, key, key_length, value, value_length)) + continue; + + if(added_keys && !added_this_key) { + if(sd_journal_add_conjunction(j) < 0) + failures++; + + added_this_key = true; + added_keys++; + } + else if(added_values) + if(sd_journal_add_disjunction(j) < 0) + failures++; + + if(sd_journal_add_match(j, data, data_length) < 0) + failures++; + + added_values++; + filters_added++; + } } } + } + + if(failures) { + log_fqs(fqs, "failed to setup journal filter, will run the full query."); + sd_journal_flush_matches(j); + return true; + } + + return filters_added ? true : false; +} +#endif // HAVE_SD_JOURNAL_RESTART_FIELDS + +static ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_one_file( + const char *filename, BUFFER *wb, FACETS *facets, + struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + sd_journal *j = NULL; + errno = 0; + + fstat_cache_enable_on_thread(); + + const char *paths[2] = { + [0] = filename, + [1] = NULL, + }; + + if(sd_journal_open_files(&j, paths, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) { + fstat_cache_disable_on_thread(); + return ND_SD_JOURNAL_FAILED_TO_OPEN; + } + + ND_SD_JOURNAL_STATUS status; + bool matches_filters = true; + +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS + if(fqs->slice) { + usec_t started = now_monotonic_usec(); + + matches_filters = netdata_systemd_filtering_by_journal(j, facets, fqs) || !fqs->filters; + usec_t ended = now_monotonic_usec(); + + fqs->matches_setup_ut += (ended - started); + } +#endif // HAVE_SD_JOURNAL_RESTART_FIELDS + + if(matches_filters) { + if(fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD) + status = netdata_systemd_journal_query_forward(j, wb, facets, jf, fqs); + else + status = netdata_systemd_journal_query_backward(j, wb, facets, jf, fqs); + } + else + status = ND_SD_JOURNAL_NO_FILE_MATCHED; + + sd_journal_close(j); + fstat_cache_disable_on_thread(); + + return status; +} + +// ---------------------------------------------------------------------------- +// journal files registry + +#define VAR_LOG_JOURNAL_MAX_DEPTH 10 +#define MAX_JOURNAL_DIRECTORIES 100 + +struct journal_directory { + char *path; + bool logged_failure; +}; + +static struct journal_directory journal_directories[MAX_JOURNAL_DIRECTORIES] = { 0 }; +static DICTIONARY *journal_files_registry = NULL; +static DICTIONARY *used_hashes_registry = NULL; + +static usec_t systemd_journal_session = 0; + +static void buffer_json_journal_versions(BUFFER *wb) { + buffer_json_member_add_object(wb, "versions"); + { + buffer_json_member_add_uint64(wb, "sources", + systemd_journal_session + dictionary_version(journal_files_registry)); + } + buffer_json_object_close(wb); +} + +static void journal_file_update_msg_ut(const char *filename, struct journal_file *jf) { + fstat_cache_enable_on_thread(); + + const char *files[2] = { + [0] = filename, + [1] = NULL, + }; + + sd_journal *j = NULL; + if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) { + fstat_cache_disable_on_thread(); + + if(!jf->logged_failure) { + netdata_log_error("cannot open journal file '%s', using file timestamps to understand time-frame.", filename); + jf->logged_failure = true; + } + + jf->msg_first_ut = 0; + jf->msg_last_ut = jf->file_last_modified_ut; + return; + } + + usec_t first_ut = 0, last_ut = 0; + + if(sd_journal_seek_head(j) < 0 || sd_journal_next(j) < 0 || sd_journal_get_realtime_usec(j, &first_ut) < 0 || !first_ut) { + internal_error(true, "cannot find the timestamp of the first message in '%s'", filename); + first_ut = 0; + } + + if(sd_journal_seek_tail(j) < 0 || sd_journal_previous(j) < 0 || sd_journal_get_realtime_usec(j, &last_ut) < 0 || !last_ut) { + internal_error(true, "cannot find the timestamp of the last message in '%s'", filename); + last_ut = jf->file_last_modified_ut; + } sd_journal_close(j); + fstat_cache_disable_on_thread(); + + if(first_ut > last_ut) { + internal_error(true, "timestamps are flipped in file '%s'", filename); + usec_t t = first_ut; + first_ut = last_ut; + last_ut = t; + } + + jf->msg_first_ut = first_ut; + jf->msg_last_ut = last_ut; +} + +static STRING *string_strdupz_source(const char *s, const char *e, size_t max_len, const char *prefix) { + char buf[max_len]; + size_t len; + char *dst = buf; + + if(prefix) { + len = strlen(prefix); + memcpy(buf, prefix, len); + dst = &buf[len]; + max_len -= len; + } + + len = e - s; + if(len >= max_len) + len = max_len - 1; + memcpy(dst, s, len); + dst[len] = '\0'; + buf[max_len - 1] = '\0'; + + for(size_t i = 0; buf[i] ;i++) + if(!isalnum(buf[i]) && buf[i] != '-' && buf[i] != '.' && buf[i] != ':') + buf[i] = '_'; + + return string_strdupz(buf); +} + +static void files_registry_insert_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) { + struct journal_file *jf = value; + jf->filename = dictionary_acquired_item_name(item); + jf->filename_len = strlen(jf->filename); + + // based on the filename + // decide the source to show to the user + const char *s = strrchr(jf->filename, '/'); + if(s) { + if(strstr(jf->filename, "/remote/")) + jf->source_type = SDJF_REMOTE; + else { + const char *t = s - 1; + while(t >= jf->filename && *t != '.' && *t != '/') + t--; + + if(t >= jf->filename && *t == '.') { + jf->source_type = SDJF_NAMESPACE; + jf->source = string_strdupz_source(t + 1, s, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "namespace-"); + } + else + jf->source_type = SDJF_LOCAL; + } + + if(strncmp(s, "/system", 7) == 0) + jf->source_type |= SDJF_SYSTEM; + + else if(strncmp(s, "/user", 5) == 0) + jf->source_type |= SDJF_USER; + + else if(strncmp(s, "/remote-", 8) == 0) { + jf->source_type |= SDJF_REMOTE; + + s = &s[8]; // skip "/remote-" + + char *e = strchr(s, '@'); + if(!e) + e = strstr(s, ".journal"); + + if(e) { + const char *d = s; + for(; d < e && (isdigit(*d) || *d == '.' || *d == ':') ; d++) ; + if(d == e) { + // a valid IP address + char ip[e - s + 1]; + memcpy(ip, s, e - s); + ip[e - s] = '\0'; + char buf[SYSTEMD_JOURNAL_MAX_SOURCE_LEN]; + if(ip_to_hostname(ip, buf, sizeof(buf))) + jf->source = string_strdupz_source(buf, &buf[strlen(buf)], SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); + else { + internal_error(true, "Cannot find the hostname for IP '%s'", ip); + jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); + } + } + else + jf->source = string_strdupz_source(s, e, SYSTEMD_JOURNAL_MAX_SOURCE_LEN, "remote-"); + } + else + jf->source_type |= SDJF_OTHER; + } + else + jf->source_type |= SDJF_OTHER; + } + else + jf->source_type = SDJF_LOCAL | SDJF_OTHER; + + journal_file_update_msg_ut(jf->filename, jf); + + internal_error(true, + "found journal file '%s', type %d, source '%s', " + "file modified: %"PRIu64", " + "msg {first: %"PRIu64", last: %"PRIu64"}", + jf->filename, jf->source_type, jf->source ? string2str(jf->source) : "<unset>", + jf->file_last_modified_ut, + jf->msg_first_ut, jf->msg_last_ut); +} + +static bool files_registry_conflict_cb(const DICTIONARY_ITEM *item, void *old_value, void *new_value, void *data __maybe_unused) { + struct journal_file *jf = old_value; + struct journal_file *njf = new_value; + + if(njf->last_scan_ut > jf->last_scan_ut) + jf->last_scan_ut = njf->last_scan_ut; + + if(njf->file_last_modified_ut > jf->file_last_modified_ut) { + jf->file_last_modified_ut = njf->file_last_modified_ut; + jf->size = njf->size; + + const char *filename = dictionary_acquired_item_name(item); + journal_file_update_msg_ut(filename, jf); + +// internal_error(true, +// "updated journal file '%s', type %d, " +// "file modified: %"PRIu64", " +// "msg {first: %"PRIu64", last: %"PRIu64"}", +// filename, jf->source_type, +// jf->file_last_modified_ut, +// jf->msg_first_ut, jf->msg_last_ut); + } + + return false; +} + +#define SDJF_SOURCE_ALL_NAME "all" +#define SDJF_SOURCE_LOCAL_NAME "all-local-logs" +#define SDJF_SOURCE_LOCAL_SYSTEM_NAME "all-local-system-logs" +#define SDJF_SOURCE_LOCAL_USERS_NAME "all-local-user-logs" +#define SDJF_SOURCE_LOCAL_OTHER_NAME "all-uncategorized" +#define SDJF_SOURCE_NAMESPACES_NAME "all-local-namespaces" +#define SDJF_SOURCE_REMOTES_NAME "all-remote-systems" + +struct journal_file_source { + usec_t first_ut; + usec_t last_ut; + size_t count; + uint64_t size; +}; + +static void human_readable_size_ib(uint64_t size, char *dst, size_t dst_len) { + if(size > 1024ULL * 1024 * 1024 * 1024) + snprintfz(dst, dst_len, "%0.2f TiB", (double)size / 1024.0 / 1024.0 / 1024.0 / 1024.0); + else if(size > 1024ULL * 1024 * 1024) + snprintfz(dst, dst_len, "%0.2f GiB", (double)size / 1024.0 / 1024.0 / 1024.0); + else if(size > 1024ULL * 1024) + snprintfz(dst, dst_len, "%0.2f MiB", (double)size / 1024.0 / 1024.0); + else if(size > 1024ULL) + snprintfz(dst, dst_len, "%0.2f KiB", (double)size / 1024.0); + else + snprintfz(dst, dst_len, "%"PRIu64" B", size); +} + +#define print_duration(dst, dst_len, pos, remaining, duration, one, many, printed) do { \ + if((remaining) > (duration)) { \ + uint64_t _count = (remaining) / (duration); \ + uint64_t _rem = (remaining) - (_count * (duration)); \ + (pos) += snprintfz(&(dst)[pos], (dst_len) - (pos), "%s%s%"PRIu64" %s", (printed) ? ", " : "", _rem ? "" : "and ", _count, _count > 1 ? (many) : (one)); \ + (remaining) = _rem; \ + (printed) = true; \ + } \ +} while(0) + +static void human_readable_duration_s(time_t duration_s, char *dst, size_t dst_len) { + if(duration_s < 0) + duration_s = -duration_s; + + size_t pos = 0; + dst[0] = 0 ; + + bool printed = false; + print_duration(dst, dst_len, pos, duration_s, 86400 * 365, "year", "years", printed); + print_duration(dst, dst_len, pos, duration_s, 86400 * 30, "month", "months", printed); + print_duration(dst, dst_len, pos, duration_s, 86400 * 1, "day", "days", printed); + print_duration(dst, dst_len, pos, duration_s, 3600 * 1, "hour", "hours", printed); + print_duration(dst, dst_len, pos, duration_s, 60 * 1, "min", "mins", printed); + print_duration(dst, dst_len, pos, duration_s, 1, "sec", "secs", printed); +} + +static int journal_file_to_json_array_cb(const DICTIONARY_ITEM *item, void *entry, void *data) { + struct journal_file_source *jfs = entry; + BUFFER *wb = data; + + const char *name = dictionary_acquired_item_name(item); + + buffer_json_add_array_item_object(wb); + { + char size_for_humans[100]; + human_readable_size_ib(jfs->size, size_for_humans, sizeof(size_for_humans)); + + char duration_for_humans[1024]; + human_readable_duration_s((time_t)((jfs->last_ut - jfs->first_ut) / USEC_PER_SEC), + duration_for_humans, sizeof(duration_for_humans)); + + char info[1024]; + snprintfz(info, sizeof(info), "%zu files, with a total size of %s, covering %s", + jfs->count, size_for_humans, duration_for_humans); + + buffer_json_member_add_string(wb, "id", name); + buffer_json_member_add_string(wb, "name", name); + buffer_json_member_add_string(wb, "pill", size_for_humans); + buffer_json_member_add_string(wb, "info", info); + } + buffer_json_object_close(wb); // options object + + return 1; +} + +static bool journal_file_merge_sizes(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value , void *data __maybe_unused) { + struct journal_file_source *jfs = old_value, *njfs = new_value; + jfs->count += njfs->count; + jfs->size += njfs->size; + + if(njfs->first_ut && njfs->first_ut < jfs->first_ut) + jfs->first_ut = njfs->first_ut; + + if(njfs->last_ut && njfs->last_ut > jfs->last_ut) + jfs->last_ut = njfs->last_ut; + + return false; +} + +static void available_journal_file_sources_to_json_array(BUFFER *wb) { + DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED|DICT_OPTION_NAME_LINK_DONT_CLONE|DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_conflict_callback(dict, journal_file_merge_sizes, NULL); + + struct journal_file_source t = { 0 }; + + struct journal_file *jf; + dfe_start_read(journal_files_registry, jf) { + t.first_ut = jf->msg_first_ut; + t.last_ut = jf->msg_last_ut; + t.count = 1; + t.size = jf->size; + + dictionary_set(dict, SDJF_SOURCE_ALL_NAME, &t, sizeof(t)); + + if((jf->source_type & (SDJF_LOCAL)) == (SDJF_LOCAL)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_LOCAL | SDJF_SYSTEM)) == (SDJF_LOCAL | SDJF_SYSTEM)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_SYSTEM_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_LOCAL | SDJF_USER)) == (SDJF_LOCAL | SDJF_USER)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_USERS_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_LOCAL | SDJF_OTHER)) == (SDJF_LOCAL | SDJF_OTHER)) + dictionary_set(dict, SDJF_SOURCE_LOCAL_OTHER_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_NAMESPACE)) == (SDJF_NAMESPACE)) + dictionary_set(dict, SDJF_SOURCE_NAMESPACES_NAME, &t, sizeof(t)); + if((jf->source_type & (SDJF_REMOTE)) == (SDJF_REMOTE)) + dictionary_set(dict, SDJF_SOURCE_REMOTES_NAME, &t, sizeof(t)); + if(jf->source) + dictionary_set(dict, string2str(jf->source), &t, sizeof(t)); + } + dfe_done(jf); + + dictionary_sorted_walkthrough_read(dict, journal_file_to_json_array_cb, wb); + + dictionary_destroy(dict); +} + +static void files_registry_delete_cb(const DICTIONARY_ITEM *item, void *value, void *data __maybe_unused) { + struct journal_file *jf = value; (void)jf; + const char *filename = dictionary_acquired_item_name(item); (void)filename; + + string_freez(jf->source); + internal_error(true, "removed journal file '%s'", filename); +} + +void journal_directory_scan(const char *dirname, int depth, usec_t last_scan_ut) { + static const char *ext = ".journal"; + static const size_t ext_len = sizeof(".journal") - 1; + + if (depth > VAR_LOG_JOURNAL_MAX_DEPTH) + return; + + DIR *dir; + struct dirent *entry; + struct stat info; + char absolute_path[FILENAME_MAX]; + + // Open the directory. + if ((dir = opendir(dirname)) == NULL) { + if(errno != ENOENT && errno != ENOTDIR) + netdata_log_error("Cannot opendir() '%s'", dirname); + return; + } + + // Read each entry in the directory. + while ((entry = readdir(dir)) != NULL) { + snprintfz(absolute_path, sizeof(absolute_path), "%s/%s", dirname, entry->d_name); + if (stat(absolute_path, &info) != 0) { + netdata_log_error("Failed to stat() '%s", absolute_path); + continue; + } + + if (S_ISDIR(info.st_mode)) { + // If entry is a directory, call traverse recursively. + if (strcmp(entry->d_name, ".") != 0 && strcmp(entry->d_name, "..") != 0) + journal_directory_scan(absolute_path, depth + 1, last_scan_ut); + + } + else if (S_ISREG(info.st_mode)) { + // If entry is a regular file, check if it ends with .journal. + char *filename = entry->d_name; + size_t len = strlen(filename); + + if (len > ext_len && strcmp(filename + len - ext_len, ext) == 0) { + struct journal_file t = { + .file_last_modified_ut = info.st_mtim.tv_sec * USEC_PER_SEC + info.st_mtim.tv_nsec / NSEC_PER_USEC, + .last_scan_ut = last_scan_ut, + .size = info.st_size, + .max_journal_vs_realtime_delta_ut = JOURNAL_VS_REALTIME_DELTA_DEFAULT_UT, + }; + dictionary_set(journal_files_registry, absolute_path, &t, sizeof(t)); + } + } + } + + closedir(dir); +} + +static void journal_files_registry_update() { + usec_t scan_ut = now_monotonic_usec(); + + for(unsigned i = 0; i < MAX_JOURNAL_DIRECTORIES ;i++) { + if(!journal_directories[i].path) + break; + + journal_directory_scan(journal_directories[i].path, 0, scan_ut); + } + + struct journal_file *jf; + dfe_start_write(journal_files_registry, jf) { + if(jf->last_scan_ut < scan_ut) + dictionary_del(journal_files_registry, jf_dfe.name); + } + dfe_done(jf); +} + +// ---------------------------------------------------------------------------- + +static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { + + if((fqs->source_type == SDJF_ALL || (jf->source_type & fqs->source_type) == fqs->source_type) && + (!fqs->source || fqs->source == jf->source)) { + + usec_t anchor_delta = JOURNAL_VS_REALTIME_DELTA_MAX_UT; + usec_t first_ut = jf->msg_first_ut; + usec_t last_ut = jf->msg_last_ut + anchor_delta; + + if(last_ut >= fqs->after_ut && first_ut <= fqs->before_ut) + return true; + } + + return false; +} + +static int journal_file_dict_items_backward_compar(const void *a, const void *b) { + const DICTIONARY_ITEM **ad = (const DICTIONARY_ITEM **)a, **bd = (const DICTIONARY_ITEM **)b; + struct journal_file *jfa = dictionary_acquired_item_value(*ad); + struct journal_file *jfb = dictionary_acquired_item_value(*bd); + + if(jfa->msg_last_ut < jfb->msg_last_ut) + return 1; + + if(jfa->msg_last_ut > jfb->msg_last_ut) + return -1; + + if(jfa->msg_first_ut < jfb->msg_first_ut) + return 1; + + if(jfa->msg_first_ut > jfb->msg_first_ut) + return -1; + + return 0; +} + +static int journal_file_dict_items_forward_compar(const void *a, const void *b) { + return -journal_file_dict_items_backward_compar(a, b); +} + +static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QUERY_STATUS *fqs) { + ND_SD_JOURNAL_STATUS status = ND_SD_JOURNAL_NO_FILE_MATCHED; + struct journal_file *jf; + + fqs->files_matched = 0; + fqs->file_working = 0; + fqs->rows_useful = 0; + fqs->rows_read = 0; + fqs->bytes_read = 0; + + size_t files_used = 0; + size_t files_max = dictionary_entries(journal_files_registry); + const DICTIONARY_ITEM *file_items[files_max]; + + // count the files + bool files_are_newer = false; + dfe_start_read(journal_files_registry, jf) { + if(!jf_is_mine(jf, fqs)) + continue; + + file_items[files_used++] = dictionary_acquired_item_dup(journal_files_registry, jf_dfe.item); + + if(jf->msg_last_ut > fqs->if_modified_since) + files_are_newer = true; + } + dfe_done(jf); + + fqs->files_matched = files_used; + + if(fqs->if_modified_since && !files_are_newer) { + buffer_flush(wb); + return HTTP_RESP_NOT_MODIFIED; + } + + // sort the files, so that they are optimal for facets + if(files_used >= 2) { + if (fqs->direction == FACETS_ANCHOR_DIRECTION_BACKWARD) + qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *), + journal_file_dict_items_backward_compar); + else + qsort(file_items, files_used, sizeof(const DICTIONARY_ITEM *), + journal_file_dict_items_forward_compar); + } + + bool partial = false; + usec_t started_ut; + usec_t ended_ut = now_monotonic_usec(); + + buffer_json_member_add_array(wb, "_journal_files"); + for(size_t f = 0; f < files_used ;f++) { + const char *filename = dictionary_acquired_item_name(file_items[f]); + jf = dictionary_acquired_item_value(file_items[f]); + + if(!jf_is_mine(jf, fqs)) + continue; + + fqs->file_working++; + fqs->cached_count = 0; + + size_t fs_calls = fstat_thread_calls; + size_t fs_cached = fstat_thread_cached_responses; + size_t rows_useful = fqs->rows_useful; + size_t rows_read = fqs->rows_read; + size_t bytes_read = fqs->bytes_read; + size_t matches_setup_ut = fqs->matches_setup_ut; + + ND_SD_JOURNAL_STATUS tmp_status = netdata_systemd_journal_query_one_file(filename, wb, facets, jf, fqs); + + rows_useful = fqs->rows_useful - rows_useful; + rows_read = fqs->rows_read - rows_read; + bytes_read = fqs->bytes_read - bytes_read; + matches_setup_ut = fqs->matches_setup_ut - matches_setup_ut; + fs_calls = fstat_thread_calls - fs_calls; + fs_cached = fstat_thread_cached_responses - fs_cached; + + started_ut = ended_ut; + ended_ut = now_monotonic_usec(); + usec_t duration_ut = ended_ut - started_ut; + + buffer_json_add_array_item_object(wb); // journal file + { + // information about the file + buffer_json_member_add_string(wb, "_filename", filename); + buffer_json_member_add_uint64(wb, "_source_type", jf->source_type); + buffer_json_member_add_string(wb, "_source", string2str(jf->source)); + buffer_json_member_add_uint64(wb, "_last_modified_ut", jf->file_last_modified_ut); + buffer_json_member_add_uint64(wb, "_msg_first_ut", jf->msg_first_ut); + buffer_json_member_add_uint64(wb, "_msg_last_ut", jf->msg_last_ut); + buffer_json_member_add_uint64(wb, "_journal_vs_realtime_delta_ut", jf->max_journal_vs_realtime_delta_ut); + + // information about the current use of the file + buffer_json_member_add_uint64(wb, "duration_ut", ended_ut - started_ut); + buffer_json_member_add_uint64(wb, "rows_read", rows_read); + buffer_json_member_add_uint64(wb, "rows_useful", rows_useful); + buffer_json_member_add_double(wb, "rows_per_second", (double) rows_read / (double) duration_ut * (double) USEC_PER_SEC); + buffer_json_member_add_uint64(wb, "bytes_read", bytes_read); + buffer_json_member_add_double(wb, "bytes_per_second", (double) bytes_read / (double) duration_ut * (double) USEC_PER_SEC); + buffer_json_member_add_uint64(wb, "duration_matches_ut", matches_setup_ut); + buffer_json_member_add_uint64(wb, "fstat_query_calls", fs_calls); + buffer_json_member_add_uint64(wb, "fstat_query_cached_responses", fs_cached); + } + buffer_json_object_close(wb); // journal file + + bool stop = false; + switch(tmp_status) { + case ND_SD_JOURNAL_OK: + case ND_SD_JOURNAL_NO_FILE_MATCHED: + status = (status == ND_SD_JOURNAL_OK) ? ND_SD_JOURNAL_OK : tmp_status; + break; + + case ND_SD_JOURNAL_FAILED_TO_OPEN: + case ND_SD_JOURNAL_FAILED_TO_SEEK: + partial = true; + if(status == ND_SD_JOURNAL_NO_FILE_MATCHED) + status = tmp_status; + break; + + case ND_SD_JOURNAL_CANCELLED: + case ND_SD_JOURNAL_TIMED_OUT: + partial = true; + stop = true; + status = tmp_status; + break; + + case ND_SD_JOURNAL_NOT_MODIFIED: + internal_fatal(true, "this should never be returned here"); + break; + } + + if(stop) + break; + } + buffer_json_array_close(wb); // _journal_files + + // release the files + for(size_t f = 0; f < files_used ;f++) + dictionary_acquired_item_release(journal_files_registry, file_items[f]); + + switch (status) { + case ND_SD_JOURNAL_OK: + if(fqs->if_modified_since && !fqs->rows_useful) { + buffer_flush(wb); + return HTTP_RESP_NOT_MODIFIED; + } + break; + + case ND_SD_JOURNAL_TIMED_OUT: + case ND_SD_JOURNAL_NO_FILE_MATCHED: + break; + + case ND_SD_JOURNAL_CANCELLED: + buffer_flush(wb); + return HTTP_RESP_CLIENT_CLOSED_REQUEST; + + case ND_SD_JOURNAL_NOT_MODIFIED: + buffer_flush(wb); + return HTTP_RESP_NOT_MODIFIED; + + default: + case ND_SD_JOURNAL_FAILED_TO_OPEN: + case ND_SD_JOURNAL_FAILED_TO_SEEK: + buffer_flush(wb); + return HTTP_RESP_INTERNAL_SERVER_ERROR; + } buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); - buffer_json_member_add_boolean(wb, "partial", timed_out); + buffer_json_member_add_boolean(wb, "partial", partial); buffer_json_member_add_string(wb, "type", "table"); - buffer_json_member_add_time_t(wb, "update_every", 1); - buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); - facets_report(facets, wb); + if(!fqs->data_only) { + buffer_json_member_add_time_t(wb, "update_every", 1); + buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); + } + + if(!fqs->data_only || fqs->tail) + buffer_json_member_add_uint64(wb, "last_modified", fqs->last_modified); + + facets_sort_and_reorder_keys(facets); + facets_report(facets, wb, used_hashes_registry); + + buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + (fqs->data_only ? 3600 : 0)); - buffer_json_member_add_time_t(wb, "expires", now_realtime_sec()); + buffer_json_member_add_object(wb, "_fstat_caching"); + { + buffer_json_member_add_uint64(wb, "calls", fstat_thread_calls); + buffer_json_member_add_uint64(wb, "cached", fstat_thread_cached_responses); + } + buffer_json_object_close(wb); // _fstat_caching buffer_json_finalize(wb); return HTTP_RESP_OK; } -static void systemd_journal_function_help(const char *transaction) { - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600); - fprintf(stdout, +static void netdata_systemd_journal_function_help(const char *transaction) { + BUFFER *wb = buffer_create(0, NULL); + buffer_sprintf(wb, "%s / %s\n" "\n" "%s\n" "\n" - "The following filters are supported:\n" + "The following parameters are supported:\n" "\n" - " help\n" + " "JOURNAL_PARAMETER_HELP"\n" " Shows this help message.\n" "\n" - " before:TIMESTAMP\n" + " "JOURNAL_PARAMETER_INFO"\n" + " Request initial configuration information about the plugin.\n" + " The key entity returned is the required_params array, which includes\n" + " all the available systemd journal sources.\n" + " When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n" + "\n" + " "JOURNAL_PARAMETER_ID":STRING\n" + " Caller supplied unique ID of the request.\n" + " This can be used later to request a progress report of the query.\n" + " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n" + "\n" + " "JOURNAL_PARAMETER_PROGRESS"\n" + " Request a progress report (the `id` of a running query is required).\n" + " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n" + "\n" + " "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n" + " Quickly respond with data requested, without generating a\n" + " `histogram`, `facets` counters and `items`.\n" + "\n" + " "JOURNAL_PARAMETER_DELTA":true or "JOURNAL_PARAMETER_DELTA":false\n" + " When doing data only queries, include deltas for histogram, facets and items.\n" + "\n" + " "JOURNAL_PARAMETER_TAIL":true or "JOURNAL_PARAMETER_TAIL":false\n" + " When doing data only queries, respond with the newest messages,\n" + " and up to the anchor, but calculate deltas (if requested) for\n" + " the duration [anchor - before].\n" + "\n" + " "JOURNAL_PARAMETER_SLICE":true or "JOURNAL_PARAMETER_SLICE":false\n" + " When it is turned on, the plugin is executing filtering via libsystemd,\n" + " utilizing all the available indexes of the journal files.\n" + " When it is off, only the time constraint is handled by libsystemd and\n" + " all filtering is done by the plugin.\n" + " The default is: %s\n" + "\n" + " "JOURNAL_PARAMETER_SOURCE":SOURCE\n" + " Query only the specified journal sources.\n" + " Do an `"JOURNAL_PARAMETER_INFO"` query to find the sources.\n" + "\n" + " "JOURNAL_PARAMETER_BEFORE":TIMESTAMP_IN_SECONDS\n" " Absolute or relative (to now) timestamp in seconds, to start the query.\n" " The query is always executed from the most recent to the oldest log entry.\n" " If not given the default is: now.\n" "\n" - " after:TIMESTAMP\n" + " "JOURNAL_PARAMETER_AFTER":TIMESTAMP_IN_SECONDS\n" " Absolute or relative (to `before`) timestamp in seconds, to end the query.\n" " If not given, the default is %d.\n" "\n" - " last:ITEMS\n" + " "JOURNAL_PARAMETER_LAST":ITEMS\n" " The number of items to return.\n" " The default is %d.\n" "\n" - " anchor:NUMBER\n" - " The `timestamp` of the item last received, to return log entries after that.\n" - " If not given, the query will return the top `ITEMS` from the most recent.\n" + " "JOURNAL_PARAMETER_ANCHOR":TIMESTAMP_IN_MICROSECONDS\n" + " Return items relative to this timestamp.\n" + " The exact items to be returned depend on the query `"JOURNAL_PARAMETER_DIRECTION"`.\n" + "\n" + " "JOURNAL_PARAMETER_DIRECTION":forward or "JOURNAL_PARAMETER_DIRECTION":backward\n" + " When set to `backward` (default) the items returned are the newest before the\n" + " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_BEFORE"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n" + " When set to `forward` the items returned are the oldest after the\n" + " `"JOURNAL_PARAMETER_ANCHOR"`, (or `"JOURNAL_PARAMETER_AFTER"` if `"JOURNAL_PARAMETER_ANCHOR"` is not set)\n" + " The default is: %s\n" + "\n" + " "JOURNAL_PARAMETER_QUERY":SIMPLE_PATTERN\n" + " Do a full text search to find the log entries matching the pattern given.\n" + " The plugin is searching for matches on all fields of the database.\n" + "\n" + " "JOURNAL_PARAMETER_IF_MODIFIED_SINCE":TIMESTAMP_IN_MICROSECONDS\n" + " Each successful response, includes a `last_modified` field.\n" + " By providing the timestamp to the `"JOURNAL_PARAMETER_IF_MODIFIED_SINCE"` parameter,\n" + " the plugin will return 200 with a successful response, or 304 if the source has not\n" + " been modified since that timestamp.\n" + "\n" + " "JOURNAL_PARAMETER_HISTOGRAM":facet_id\n" + " Use the given `facet_id` for the histogram.\n" + " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n" + "\n" + " "JOURNAL_PARAMETER_FACETS":facet_id1,facet_id2,facet_id3,...\n" + " Add the given facets to the list of fields for which analysis is required.\n" + " The plugin will offer both a histogram and facet value counters for its values.\n" + " This parameter is ignored in `"JOURNAL_PARAMETER_DATA_ONLY"` mode.\n" "\n" " facet_id:value_id1,value_id2,value_id3,...\n" " Apply filters to the query, based on the facet IDs returned.\n" " Each `facet_id` can be given once, but multiple `facet_ids` can be given.\n" "\n" - "Filters can be combined. Each filter can be given only one time.\n" , program_name , SYSTEMD_JOURNAL_FUNCTION_NAME , SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION + , JOURNAL_DEFAULT_SLICE_MODE ? "true" : "false" // slice , -SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION , SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY + , JOURNAL_DEFAULT_DIRECTION == FACETS_ANCHOR_DIRECTION_BACKWARD ? "backward" : "forward" ); - pluginsd_function_result_end_to_stdout(); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "text/plain", now_realtime_sec() + 3600, wb); + netdata_mutex_unlock(&stdout_mutex); + + buffer_free(wb); } +const char *errno_map[] = { + [1] = "1 (EPERM)", // "Operation not permitted", + [2] = "2 (ENOENT)", // "No such file or directory", + [3] = "3 (ESRCH)", // "No such process", + [4] = "4 (EINTR)", // "Interrupted system call", + [5] = "5 (EIO)", // "Input/output error", + [6] = "6 (ENXIO)", // "No such device or address", + [7] = "7 (E2BIG)", // "Argument list too long", + [8] = "8 (ENOEXEC)", // "Exec format error", + [9] = "9 (EBADF)", // "Bad file descriptor", + [10] = "10 (ECHILD)", // "No child processes", + [11] = "11 (EAGAIN)", // "Resource temporarily unavailable", + [12] = "12 (ENOMEM)", // "Cannot allocate memory", + [13] = "13 (EACCES)", // "Permission denied", + [14] = "14 (EFAULT)", // "Bad address", + [15] = "15 (ENOTBLK)", // "Block device required", + [16] = "16 (EBUSY)", // "Device or resource busy", + [17] = "17 (EEXIST)", // "File exists", + [18] = "18 (EXDEV)", // "Invalid cross-device link", + [19] = "19 (ENODEV)", // "No such device", + [20] = "20 (ENOTDIR)", // "Not a directory", + [21] = "21 (EISDIR)", // "Is a directory", + [22] = "22 (EINVAL)", // "Invalid argument", + [23] = "23 (ENFILE)", // "Too many open files in system", + [24] = "24 (EMFILE)", // "Too many open files", + [25] = "25 (ENOTTY)", // "Inappropriate ioctl for device", + [26] = "26 (ETXTBSY)", // "Text file busy", + [27] = "27 (EFBIG)", // "File too large", + [28] = "28 (ENOSPC)", // "No space left on device", + [29] = "29 (ESPIPE)", // "Illegal seek", + [30] = "30 (EROFS)", // "Read-only file system", + [31] = "31 (EMLINK)", // "Too many links", + [32] = "32 (EPIPE)", // "Broken pipe", + [33] = "33 (EDOM)", // "Numerical argument out of domain", + [34] = "34 (ERANGE)", // "Numerical result out of range", + [35] = "35 (EDEADLK)", // "Resource deadlock avoided", + [36] = "36 (ENAMETOOLONG)", // "File name too long", + [37] = "37 (ENOLCK)", // "No locks available", + [38] = "38 (ENOSYS)", // "Function not implemented", + [39] = "39 (ENOTEMPTY)", // "Directory not empty", + [40] = "40 (ELOOP)", // "Too many levels of symbolic links", + [42] = "42 (ENOMSG)", // "No message of desired type", + [43] = "43 (EIDRM)", // "Identifier removed", + [44] = "44 (ECHRNG)", // "Channel number out of range", + [45] = "45 (EL2NSYNC)", // "Level 2 not synchronized", + [46] = "46 (EL3HLT)", // "Level 3 halted", + [47] = "47 (EL3RST)", // "Level 3 reset", + [48] = "48 (ELNRNG)", // "Link number out of range", + [49] = "49 (EUNATCH)", // "Protocol driver not attached", + [50] = "50 (ENOCSI)", // "No CSI structure available", + [51] = "51 (EL2HLT)", // "Level 2 halted", + [52] = "52 (EBADE)", // "Invalid exchange", + [53] = "53 (EBADR)", // "Invalid request descriptor", + [54] = "54 (EXFULL)", // "Exchange full", + [55] = "55 (ENOANO)", // "No anode", + [56] = "56 (EBADRQC)", // "Invalid request code", + [57] = "57 (EBADSLT)", // "Invalid slot", + [59] = "59 (EBFONT)", // "Bad font file format", + [60] = "60 (ENOSTR)", // "Device not a stream", + [61] = "61 (ENODATA)", // "No data available", + [62] = "62 (ETIME)", // "Timer expired", + [63] = "63 (ENOSR)", // "Out of streams resources", + [64] = "64 (ENONET)", // "Machine is not on the network", + [65] = "65 (ENOPKG)", // "Package not installed", + [66] = "66 (EREMOTE)", // "Object is remote", + [67] = "67 (ENOLINK)", // "Link has been severed", + [68] = "68 (EADV)", // "Advertise error", + [69] = "69 (ESRMNT)", // "Srmount error", + [70] = "70 (ECOMM)", // "Communication error on send", + [71] = "71 (EPROTO)", // "Protocol error", + [72] = "72 (EMULTIHOP)", // "Multihop attempted", + [73] = "73 (EDOTDOT)", // "RFS specific error", + [74] = "74 (EBADMSG)", // "Bad message", + [75] = "75 (EOVERFLOW)", // "Value too large for defined data type", + [76] = "76 (ENOTUNIQ)", // "Name not unique on network", + [77] = "77 (EBADFD)", // "File descriptor in bad state", + [78] = "78 (EREMCHG)", // "Remote address changed", + [79] = "79 (ELIBACC)", // "Can not access a needed shared library", + [80] = "80 (ELIBBAD)", // "Accessing a corrupted shared library", + [81] = "81 (ELIBSCN)", // ".lib section in a.out corrupted", + [82] = "82 (ELIBMAX)", // "Attempting to link in too many shared libraries", + [83] = "83 (ELIBEXEC)", // "Cannot exec a shared library directly", + [84] = "84 (EILSEQ)", // "Invalid or incomplete multibyte or wide character", + [85] = "85 (ERESTART)", // "Interrupted system call should be restarted", + [86] = "86 (ESTRPIPE)", // "Streams pipe error", + [87] = "87 (EUSERS)", // "Too many users", + [88] = "88 (ENOTSOCK)", // "Socket operation on non-socket", + [89] = "89 (EDESTADDRREQ)", // "Destination address required", + [90] = "90 (EMSGSIZE)", // "Message too long", + [91] = "91 (EPROTOTYPE)", // "Protocol wrong type for socket", + [92] = "92 (ENOPROTOOPT)", // "Protocol not available", + [93] = "93 (EPROTONOSUPPORT)", // "Protocol not supported", + [94] = "94 (ESOCKTNOSUPPORT)", // "Socket type not supported", + [95] = "95 (ENOTSUP)", // "Operation not supported", + [96] = "96 (EPFNOSUPPORT)", // "Protocol family not supported", + [97] = "97 (EAFNOSUPPORT)", // "Address family not supported by protocol", + [98] = "98 (EADDRINUSE)", // "Address already in use", + [99] = "99 (EADDRNOTAVAIL)", // "Cannot assign requested address", + [100] = "100 (ENETDOWN)", // "Network is down", + [101] = "101 (ENETUNREACH)", // "Network is unreachable", + [102] = "102 (ENETRESET)", // "Network dropped connection on reset", + [103] = "103 (ECONNABORTED)", // "Software caused connection abort", + [104] = "104 (ECONNRESET)", // "Connection reset by peer", + [105] = "105 (ENOBUFS)", // "No buffer space available", + [106] = "106 (EISCONN)", // "Transport endpoint is already connected", + [107] = "107 (ENOTCONN)", // "Transport endpoint is not connected", + [108] = "108 (ESHUTDOWN)", // "Cannot send after transport endpoint shutdown", + [109] = "109 (ETOOMANYREFS)", // "Too many references: cannot splice", + [110] = "110 (ETIMEDOUT)", // "Connection timed out", + [111] = "111 (ECONNREFUSED)", // "Connection refused", + [112] = "112 (EHOSTDOWN)", // "Host is down", + [113] = "113 (EHOSTUNREACH)", // "No route to host", + [114] = "114 (EALREADY)", // "Operation already in progress", + [115] = "115 (EINPROGRESS)", // "Operation now in progress", + [116] = "116 (ESTALE)", // "Stale file handle", + [117] = "117 (EUCLEAN)", // "Structure needs cleaning", + [118] = "118 (ENOTNAM)", // "Not a XENIX named type file", + [119] = "119 (ENAVAIL)", // "No XENIX semaphores available", + [120] = "120 (EISNAM)", // "Is a named type file", + [121] = "121 (EREMOTEIO)", // "Remote I/O error", + [122] = "122 (EDQUOT)", // "Disk quota exceeded", + [123] = "123 (ENOMEDIUM)", // "No medium found", + [124] = "124 (EMEDIUMTYPE)", // "Wrong medium type", + [125] = "125 (ECANCELED)", // "Operation canceled", + [126] = "126 (ENOKEY)", // "Required key not available", + [127] = "127 (EKEYEXPIRED)", // "Key has expired", + [128] = "128 (EKEYREVOKED)", // "Key has been revoked", + [129] = "129 (EKEYREJECTED)", // "Key was rejected by service", + [130] = "130 (EOWNERDEAD)", // "Owner died", + [131] = "131 (ENOTRECOVERABLE)", // "State not recoverable", + [132] = "132 (ERFKILL)", // "Operation not possible due to RF-kill", + [133] = "133 (EHWPOISON)", // "Memory page has hardware error", +}; + static const char *syslog_facility_to_name(int facility) { switch (facility) { case LOG_FAC(LOG_KERN): return "kern"; @@ -216,31 +1690,57 @@ static const char *syslog_priority_to_name(int priority) { } } +static FACET_ROW_SEVERITY syslog_priority_to_facet_severity(FACETS *facets __maybe_unused, FACET_ROW *row, void *data __maybe_unused) { + // same to + // https://github.com/systemd/systemd/blob/aab9e4b2b86905a15944a1ac81e471b5b7075932/src/basic/terminal-util.c#L1501 + // function get_log_colors() + + FACET_ROW_KEY_VALUE *priority_rkv = dictionary_get(row->dict, "PRIORITY"); + if(!priority_rkv || priority_rkv->empty) + return FACET_ROW_SEVERITY_NORMAL; + + int priority = str2i(buffer_tostring(priority_rkv->wb)); + + if(priority <= LOG_ERR) + return FACET_ROW_SEVERITY_CRITICAL; + + else if (priority <= LOG_WARNING) + return FACET_ROW_SEVERITY_WARNING; + + else if(priority <= LOG_NOTICE) + return FACET_ROW_SEVERITY_NOTICE; + + else if(priority >= LOG_DEBUG) + return FACET_ROW_SEVERITY_DEBUG; + + return FACET_ROW_SEVERITY_NORMAL; +} + static char *uid_to_username(uid_t uid, char *buffer, size_t buffer_size) { - struct passwd pw, *result; - char tmp[1024 + 1]; + static __thread char tmp[1024 + 1]; + struct passwd pw, *result = NULL; - if (getpwuid_r(uid, &pw, tmp, 1024, &result) != 0 || result == NULL) - return NULL; + if (getpwuid_r(uid, &pw, tmp, sizeof(tmp), &result) != 0 || !result || !pw.pw_name || !(*pw.pw_name)) + snprintfz(buffer, buffer_size - 1, "%u", uid); + else + snprintfz(buffer, buffer_size - 1, "%u (%s)", uid, pw.pw_name); - strncpy(buffer, pw.pw_name, buffer_size - 1); - buffer[buffer_size - 1] = '\0'; // Null-terminate just in case return buffer; } static char *gid_to_groupname(gid_t gid, char* buffer, size_t buffer_size) { - struct group grp, *result; - char tmp[1024 + 1]; + static __thread char tmp[1024]; + struct group grp, *result = NULL; - if (getgrgid_r(gid, &grp, tmp, 1024, &result) != 0 || result == NULL) - return NULL; + if (getgrgid_r(gid, &grp, tmp, sizeof(tmp), &result) != 0 || !result || !grp.gr_name || !(*grp.gr_name)) + snprintfz(buffer, buffer_size - 1, "%u", gid); + else + snprintfz(buffer, buffer_size - 1, "%u (%s)", gid, grp.gr_name); - strncpy(buffer, grp.gr_name, buffer_size - 1); - buffer[buffer_size - 1] = '\0'; // Null-terminate just in case return buffer; } -static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int facility = str2i(buffer_tostring(wb)); @@ -252,7 +1752,10 @@ static void systemd_journal_transform_syslog_facility(FACETS *facets __maybe_unu } } -static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, void *data __maybe_unused) { +static void netdata_systemd_journal_transform_priority(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { int priority = str2i(buffer_tostring(wb)); @@ -264,141 +1767,663 @@ static void systemd_journal_transform_priority(FACETS *facets __maybe_unused, BU } } -static void systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { - DICTIONARY *cache = data; +static void netdata_systemd_journal_transform_errno(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { - const char *sv = dictionary_get(cache, v); - if(!sv) { - char buf[1024 + 1]; - int uid = str2i(buffer_tostring(wb)); - const char *name = uid_to_username(uid, buf, 1024); - if (!name) - name = v; + unsigned err_no = str2u(buffer_tostring(wb)); + if(err_no > 0 && err_no < sizeof(errno_map) / sizeof(*errno_map)) { + const char *name = errno_map[err_no]; + if(name) { + buffer_flush(wb); + buffer_strcat(wb, name); + } + } + } +} + +// ---------------------------------------------------------------------------- +// UID and GID transformation + +#define UID_GID_HASHTABLE_SIZE 10000 + +struct word_t2str_hashtable_entry { + struct word_t2str_hashtable_entry *next; + Word_t hash; + size_t len; + char str[]; +}; + +struct word_t2str_hashtable { + SPINLOCK spinlock; + size_t size; + struct word_t2str_hashtable_entry *hashtable[UID_GID_HASHTABLE_SIZE]; +}; + +struct word_t2str_hashtable uid_hashtable = { + .size = UID_GID_HASHTABLE_SIZE, +}; + +struct word_t2str_hashtable gid_hashtable = { + .size = UID_GID_HASHTABLE_SIZE, +}; + +struct word_t2str_hashtable_entry **word_t2str_hashtable_slot(struct word_t2str_hashtable *ht, Word_t hash) { + size_t slot = hash % ht->size; + struct word_t2str_hashtable_entry **e = &ht->hashtable[slot]; + + while(*e && (*e)->hash != hash) + e = &((*e)->next); + + return e; +} + +const char *uid_to_username_cached(uid_t uid, size_t *length) { + spinlock_lock(&uid_hashtable.spinlock); + + struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&uid_hashtable, uid); + if(!(*e)) { + static __thread char buf[1024]; + const char *name = uid_to_username(uid, buf, sizeof(buf)); + size_t size = strlen(name) + 1; + + *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); + (*e)->len = size - 1; + (*e)->hash = uid; + memcpy((*e)->str, name, size); + } + + spinlock_unlock(&uid_hashtable.spinlock); - sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1); + *length = (*e)->len; + return (*e)->str; +} + +const char *gid_to_groupname_cached(gid_t gid, size_t *length) { + spinlock_lock(&gid_hashtable.spinlock); + + struct word_t2str_hashtable_entry **e = word_t2str_hashtable_slot(&gid_hashtable, gid); + if(!(*e)) { + static __thread char buf[1024]; + const char *name = gid_to_groupname(gid, buf, sizeof(buf)); + size_t size = strlen(name) + 1; + + *e = callocz(1, sizeof(struct word_t2str_hashtable_entry) + size); + (*e)->len = size - 1; + (*e)->hash = gid; + memcpy((*e)->str, name, size); + } + + spinlock_unlock(&gid_hashtable.spinlock); + + *length = (*e)->len; + return (*e)->str; +} + +DICTIONARY *boot_ids_to_first_ut = NULL; + +static void netdata_systemd_journal_transform_boot_id(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + const char *boot_id = buffer_tostring(wb); + if(*boot_id && isxdigit(*boot_id)) { + usec_t ut = UINT64_MAX; + usec_t *p_ut = dictionary_get(boot_ids_to_first_ut, boot_id); + if(!p_ut) { + struct journal_file *jf; + dfe_start_read(journal_files_registry, jf) { + const char *files[2] = { + [0] = jf_dfe.name, + [1] = NULL, + }; + + sd_journal *j = NULL; + if(sd_journal_open_files(&j, files, ND_SD_JOURNAL_OPEN_FLAGS) < 0 || !j) + continue; + + char m[100]; + size_t len = snprintfz(m, sizeof(m), "_BOOT_ID=%s", boot_id); + usec_t t_ut = 0; + if(sd_journal_add_match(j, m, len) < 0 || + sd_journal_seek_head(j) < 0 || + sd_journal_next(j) < 0 || + sd_journal_get_realtime_usec(j, &t_ut) < 0 || !t_ut) { + sd_journal_close(j); + continue; + } + + if(t_ut < ut) + ut = t_ut; + + sd_journal_close(j); + } + dfe_done(jf); + + dictionary_set(boot_ids_to_first_ut, boot_id, &ut, sizeof(ut)); } + else + ut = *p_ut; + + if(ut != UINT64_MAX) { + time_t timestamp_sec = (time_t)(ut / USEC_PER_SEC); + struct tm tm; + char buffer[30]; + + gmtime_r(×tamp_sec, &tm); + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm); + + switch(scope) { + default: + case FACETS_TRANSFORM_DATA: + case FACETS_TRANSFORM_VALUE: + buffer_sprintf(wb, " (%s UTC) ", buffer); + break; + + case FACETS_TRANSFORM_FACET: + case FACETS_TRANSFORM_FACET_SORT: + case FACETS_TRANSFORM_HISTOGRAM: + buffer_flush(wb); + buffer_sprintf(wb, "%s UTC", buffer); + break; + } + } + } +} - buffer_flush(wb); - buffer_strcat(wb, sv); +static void netdata_systemd_journal_transform_uid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + + const char *v = buffer_tostring(wb); + if(*v && isdigit(*v)) { + uid_t uid = str2i(buffer_tostring(wb)); + size_t len; + const char *name = uid_to_username_cached(uid, &len); + buffer_contents_replace(wb, name, len); } } -static void systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, void *data) { - DICTIONARY *cache = data; +static void netdata_systemd_journal_transform_gid(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + const char *v = buffer_tostring(wb); if(*v && isdigit(*v)) { - const char *sv = dictionary_get(cache, v); - if(!sv) { - char buf[1024 + 1]; - int gid = str2i(buffer_tostring(wb)); - const char *name = gid_to_groupname(gid, buf, 1024); - if (!name) - name = v; + gid_t gid = str2i(buffer_tostring(wb)); + size_t len; + const char *name = gid_to_groupname_cached(gid, &len); + buffer_contents_replace(wb, name, len); + } +} - sv = dictionary_set(cache, v, (void *)name, strlen(name) + 1); +const char *linux_capabilities[] = { + [CAP_CHOWN] = "CHOWN", + [CAP_DAC_OVERRIDE] = "DAC_OVERRIDE", + [CAP_DAC_READ_SEARCH] = "DAC_READ_SEARCH", + [CAP_FOWNER] = "FOWNER", + [CAP_FSETID] = "FSETID", + [CAP_KILL] = "KILL", + [CAP_SETGID] = "SETGID", + [CAP_SETUID] = "SETUID", + [CAP_SETPCAP] = "SETPCAP", + [CAP_LINUX_IMMUTABLE] = "LINUX_IMMUTABLE", + [CAP_NET_BIND_SERVICE] = "NET_BIND_SERVICE", + [CAP_NET_BROADCAST] = "NET_BROADCAST", + [CAP_NET_ADMIN] = "NET_ADMIN", + [CAP_NET_RAW] = "NET_RAW", + [CAP_IPC_LOCK] = "IPC_LOCK", + [CAP_IPC_OWNER] = "IPC_OWNER", + [CAP_SYS_MODULE] = "SYS_MODULE", + [CAP_SYS_RAWIO] = "SYS_RAWIO", + [CAP_SYS_CHROOT] = "SYS_CHROOT", + [CAP_SYS_PTRACE] = "SYS_PTRACE", + [CAP_SYS_PACCT] = "SYS_PACCT", + [CAP_SYS_ADMIN] = "SYS_ADMIN", + [CAP_SYS_BOOT] = "SYS_BOOT", + [CAP_SYS_NICE] = "SYS_NICE", + [CAP_SYS_RESOURCE] = "SYS_RESOURCE", + [CAP_SYS_TIME] = "SYS_TIME", + [CAP_SYS_TTY_CONFIG] = "SYS_TTY_CONFIG", + [CAP_MKNOD] = "MKNOD", + [CAP_LEASE] = "LEASE", + [CAP_AUDIT_WRITE] = "AUDIT_WRITE", + [CAP_AUDIT_CONTROL] = "AUDIT_CONTROL", + [CAP_SETFCAP] = "SETFCAP", + [CAP_MAC_OVERRIDE] = "MAC_OVERRIDE", + [CAP_MAC_ADMIN] = "MAC_ADMIN", + [CAP_SYSLOG] = "SYSLOG", + [CAP_WAKE_ALARM] = "WAKE_ALARM", + [CAP_BLOCK_SUSPEND] = "BLOCK_SUSPEND", + [37 /*CAP_AUDIT_READ*/] = "AUDIT_READ", + [38 /*CAP_PERFMON*/] = "PERFMON", + [39 /*CAP_BPF*/] = "BPF", + [40 /* CAP_CHECKPOINT_RESTORE */] = "CHECKPOINT_RESTORE", +}; + +static void netdata_systemd_journal_transform_cap_effective(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + + const char *v = buffer_tostring(wb); + if(*v && isdigit(*v)) { + uint64_t cap = strtoul(buffer_tostring(wb), NULL, 16); + if(cap) { + buffer_fast_strcat(wb, " (", 2); + for (size_t i = 0, added = 0; i < sizeof(linux_capabilities) / sizeof(linux_capabilities[0]); i++) { + if (linux_capabilities[i] && (cap & (1ULL << i))) { + + if (added) + buffer_fast_strcat(wb, " | ", 3); + + buffer_strcat(wb, linux_capabilities[i]); + added++; + } + } + buffer_fast_strcat(wb, ")", 1); } + } +} - buffer_flush(wb); - buffer_strcat(wb, sv); +static void netdata_systemd_journal_transform_timestamp_usec(FACETS *facets __maybe_unused, BUFFER *wb, FACETS_TRANSFORMATION_SCOPE scope __maybe_unused, void *data __maybe_unused) { + if(scope == FACETS_TRANSFORM_FACET_SORT) + return; + + const char *v = buffer_tostring(wb); + if(*v && isdigit(*v)) { + uint64_t ut = str2ull(buffer_tostring(wb), NULL); + if(ut) { + time_t timestamp_sec = ut / USEC_PER_SEC; + struct tm tm; + char buffer[30]; + + gmtime_r(×tamp_sec, &tm); + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &tm); + buffer_sprintf(wb, " (%s.%06llu UTC)", buffer, ut % USEC_PER_SEC); + } } } -static void systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { +// ---------------------------------------------------------------------------- + +static void netdata_systemd_journal_dynamic_row_id(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row, void *data __maybe_unused) { FACET_ROW_KEY_VALUE *pid_rkv = dictionary_get(row->dict, "_PID"); const char *pid = pid_rkv ? buffer_tostring(pid_rkv->wb) : FACET_VALUE_UNSET; - FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER"); - const char *identifier = syslog_identifier_rkv ? buffer_tostring(syslog_identifier_rkv->wb) : FACET_VALUE_UNSET; + const char *identifier = NULL; + FACET_ROW_KEY_VALUE *container_name_rkv = dictionary_get(row->dict, "CONTAINER_NAME"); + if(container_name_rkv && !container_name_rkv->empty) + identifier = buffer_tostring(container_name_rkv->wb); - if(strcmp(identifier, FACET_VALUE_UNSET) == 0) { - FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM"); - identifier = comm_rkv ? buffer_tostring(comm_rkv->wb) : FACET_VALUE_UNSET; + if(!identifier) { + FACET_ROW_KEY_VALUE *syslog_identifier_rkv = dictionary_get(row->dict, "SYSLOG_IDENTIFIER"); + if(syslog_identifier_rkv && !syslog_identifier_rkv->empty) + identifier = buffer_tostring(syslog_identifier_rkv->wb); + + if(!identifier) { + FACET_ROW_KEY_VALUE *comm_rkv = dictionary_get(row->dict, "_COMM"); + if(comm_rkv && !comm_rkv->empty) + identifier = buffer_tostring(comm_rkv->wb); + } } buffer_flush(rkv->wb); - if(strcmp(pid, FACET_VALUE_UNSET) == 0) - buffer_strcat(rkv->wb, identifier); + if(!identifier) + buffer_strcat(rkv->wb, FACET_VALUE_UNSET); else buffer_sprintf(rkv->wb, "%s[%s]", identifier, pid); buffer_json_add_array_item_string(json_array, buffer_tostring(rkv->wb)); } -static void function_systemd_journal(const char *transaction, char *function, char *line_buffer __maybe_unused, int line_max __maybe_unused, int timeout __maybe_unused) { - char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS); +static void netdata_systemd_journal_rich_message(FACETS *facets __maybe_unused, BUFFER *json_array, FACET_ROW_KEY_VALUE *rkv, FACET_ROW *row __maybe_unused, void *data __maybe_unused) { + buffer_json_add_array_item_object(json_array); + buffer_json_member_add_string(json_array, "value", buffer_tostring(rkv->wb)); + buffer_json_object_close(json_array); +} + +DICTIONARY *function_query_status_dict = NULL; + +static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) { + if(!progress_id || !(*progress_id)) { + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id"); + netdata_mutex_unlock(&stdout_mutex); + return; + } + + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id); + + if(!item) { + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here"); + netdata_mutex_unlock(&stdout_mutex); + return; + } + + FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item); + + usec_t now_monotonic_ut = now_monotonic_usec(); + if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut) + fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC; + + usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut; + + size_t files_matched = fqs->files_matched; + size_t file_working = fqs->file_working; + if(file_working > files_matched) + files_matched = file_working; + + size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED); + size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED); - BUFFER *wb = buffer_create(0, NULL); buffer_flush(wb); - buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_NEWLINE_ON_ARRAY_ITEMS); + buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); + buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); + buffer_json_member_add_string(wb, "type", "table"); + buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut); + buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched); + char msg[1024 + 1]; + snprintfz(msg, 1024, + "Read %zu rows (%0.0f rows/s), " + "data %0.1f MB (%0.1f MB/s), " + "file %zu of %zu", + rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC, + (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0, + file_working, files_matched + ); + buffer_json_member_add_string(wb, "message", msg); + buffer_json_finalize(wb); + + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb); + netdata_mutex_unlock(&stdout_mutex); - FACETS *facets = facets_create(50, 0, FACETS_OPTION_ALL_KEYS_FTS, + dictionary_acquired_item_release(function_query_status_dict, item); +} + +static void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { + fstat_thread_calls = 0; + fstat_thread_cached_responses = 0; + journal_files_registry_update(); + + BUFFER *wb = buffer_create(0, NULL); + buffer_flush(wb); + buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); + + usec_t now_monotonic_ut = now_monotonic_usec(); + FUNCTION_QUERY_STATUS tmp_fqs = { + .cancelled = cancelled, + .started_monotonic_ut = now_monotonic_ut, + .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC), + }; + FUNCTION_QUERY_STATUS *fqs = NULL; + const DICTIONARY_ITEM *fqs_item = NULL; + + FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS, SYSTEMD_ALWAYS_VISIBLE_KEYS, SYSTEMD_KEYS_INCLUDED_IN_FACETS, SYSTEMD_KEYS_EXCLUDED_FROM_FACETS); + facets_accepted_param(facets, JOURNAL_PARAMETER_INFO); + facets_accepted_param(facets, JOURNAL_PARAMETER_SOURCE); facets_accepted_param(facets, JOURNAL_PARAMETER_AFTER); facets_accepted_param(facets, JOURNAL_PARAMETER_BEFORE); facets_accepted_param(facets, JOURNAL_PARAMETER_ANCHOR); + facets_accepted_param(facets, JOURNAL_PARAMETER_DIRECTION); facets_accepted_param(facets, JOURNAL_PARAMETER_LAST); facets_accepted_param(facets, JOURNAL_PARAMETER_QUERY); + facets_accepted_param(facets, JOURNAL_PARAMETER_FACETS); + facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM); + facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE); + facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY); + facets_accepted_param(facets, JOURNAL_PARAMETER_ID); + facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS); + facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA); + facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL); + +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS + facets_accepted_param(facets, JOURNAL_PARAMETER_SLICE); +#endif // HAVE_SD_JOURNAL_RESTART_FIELDS // register the fields in the order you want them on the dashboard - facets_register_dynamic_key(facets, "ND_JOURNAL_PROCESS", FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS, - systemd_journal_dynamic_row_id, NULL); + facets_register_row_severity(facets, syslog_priority_to_facet_severity, NULL); + + facets_register_key_name(facets, "_HOSTNAME", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + + facets_register_dynamic_key_name(facets, JOURNAL_KEY_ND_JOURNAL_PROCESS, + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, + netdata_systemd_journal_dynamic_row_id, NULL); + + facets_register_key_name(facets, "MESSAGE", + FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | + FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS); + +// facets_register_dynamic_key_name(facets, "MESSAGE", +// FACET_KEY_OPTION_NEVER_FACET | FACET_KEY_OPTION_MAIN_TEXT | FACET_KEY_OPTION_RICH_TEXT | +// FACET_KEY_OPTION_VISIBLE | FACET_KEY_OPTION_FTS, +// netdata_systemd_journal_rich_message, NULL); - facets_register_key(facets, "MESSAGE", - FACET_KEY_OPTION_NO_FACET|FACET_KEY_OPTION_MAIN_TEXT|FACET_KEY_OPTION_VISIBLE|FACET_KEY_OPTION_FTS); + facets_register_key_name_transformation(facets, "PRIORITY", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_priority, NULL); - facets_register_key_transformation(facets, "PRIORITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_priority, NULL); + facets_register_key_name_transformation(facets, "SYSLOG_FACILITY", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_syslog_facility, NULL); - facets_register_key_transformation(facets, "SYSLOG_FACILITY", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_syslog_facility, NULL); + facets_register_key_name_transformation(facets, "ERRNO", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_errno, NULL); - facets_register_key(facets, "SYSLOG_IDENTIFIER", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS); - facets_register_key(facets, "UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS); - facets_register_key(facets, "USER_UNIT", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS); + facets_register_key_name(facets, JOURNAL_KEY_ND_JOURNAL_FILE, + FACET_KEY_OPTION_NEVER_FACET); - facets_register_key_transformation(facets, "_UID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_uid, uids); + facets_register_key_name(facets, "SYSLOG_IDENTIFIER", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); - facets_register_key_transformation(facets, "_GID", FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS, - systemd_journal_transform_gid, gids); + facets_register_key_name(facets, "UNIT", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + facets_register_key_name(facets, "USER_UNIT", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS); + + facets_register_key_name_transformation(facets, "_BOOT_ID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_boot_id, NULL); + + facets_register_key_name_transformation(facets, "_SYSTEMD_OWNER_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_SYSTEMD_OWNER_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_UID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "_GID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_gid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_GID", + FACET_KEY_OPTION_FACET | FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_gid, NULL); + + facets_register_key_name_transformation(facets, "_CAP_EFFECTIVE", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_cap_effective, NULL); + + facets_register_key_name_transformation(facets, "_AUDIT_LOGINUID", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "OBJECT_AUDIT_LOGINUID", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_uid, NULL); + + facets_register_key_name_transformation(facets, "_SOURCE_REALTIME_TIMESTAMP", + FACET_KEY_OPTION_FTS | FACET_KEY_OPTION_TRANSFORM_VIEW, + netdata_systemd_journal_transform_timestamp_usec, NULL); + + // ------------------------------------------------------------------------ + // parse the parameters + + bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false; time_t after_s = 0, before_s = 0; usec_t anchor = 0; + usec_t if_modified_since = 0; size_t last = 0; + FACETS_ANCHOR_DIRECTION direction = JOURNAL_DEFAULT_DIRECTION; const char *query = NULL; + const char *chart = NULL; + const char *source = NULL; + const char *progress_id = NULL; + SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL; + size_t filters = 0; - buffer_json_member_add_object(wb, "request"); - buffer_json_member_add_object(wb, "filters"); + buffer_json_member_add_object(wb, "_request"); + char *words[SYSTEMD_JOURNAL_MAX_PARAMS] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd(function, words, SYSTEMD_JOURNAL_MAX_PARAMS); for(int i = 1; i < SYSTEMD_JOURNAL_MAX_PARAMS ;i++) { - const char *keyword = get_word(words, num_words, i); + char *keyword = get_word(words, num_words, i); if(!keyword) break; if(strcmp(keyword, JOURNAL_PARAMETER_HELP) == 0) { - systemd_journal_function_help(transaction); + netdata_systemd_journal_function_help(transaction); goto cleanup; } - else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", strlen(JOURNAL_PARAMETER_AFTER ":")) == 0) { - after_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_AFTER ":")]); + else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) { + info = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", strlen(JOURNAL_PARAMETER_BEFORE ":")) == 0) { - before_s = str2l(&keyword[strlen(JOURNAL_PARAMETER_BEFORE ":")]); + else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) { + progress = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", strlen(JOURNAL_PARAMETER_ANCHOR ":")) == 0) { - anchor = str2ull(&keyword[strlen(JOURNAL_PARAMETER_ANCHOR ":")], NULL); + else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + delta = false; + else + delta = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", strlen(JOURNAL_PARAMETER_LAST ":")) == 0) { - last = str2ul(&keyword[strlen(JOURNAL_PARAMETER_LAST ":")]); + else if(strncmp(keyword, JOURNAL_PARAMETER_TAIL ":", sizeof(JOURNAL_PARAMETER_TAIL ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_TAIL ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + tail = false; + else + tail = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", strlen(JOURNAL_PARAMETER_QUERY ":")) == 0) { - query= &keyword[strlen(JOURNAL_PARAMETER_QUERY ":")]; + else if(strncmp(keyword, JOURNAL_PARAMETER_DATA_ONLY ":", sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_DATA_ONLY ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + data_only = false; + else + data_only = true; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_SLICE ":", sizeof(JOURNAL_PARAMETER_SLICE ":") - 1) == 0) { + char *v = &keyword[sizeof(JOURNAL_PARAMETER_SLICE ":") - 1]; + + if(strcmp(v, "false") == 0 || strcmp(v, "no") == 0 || strcmp(v, "0") == 0) + slice = false; + else + slice = true; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) { + char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1]; + + if(*id) + progress_id = id; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) { + source = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; + + if(strcmp(source, SDJF_SOURCE_ALL_NAME) == 0) { + source_type = SDJF_ALL; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_NAME) == 0) { + source_type = SDJF_LOCAL; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_REMOTES_NAME) == 0) { + source_type = SDJF_REMOTE; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_NAMESPACES_NAME) == 0) { + source_type = SDJF_NAMESPACE; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_SYSTEM_NAME) == 0) { + source_type = SDJF_LOCAL | SDJF_SYSTEM; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_USERS_NAME) == 0) { + source_type = SDJF_LOCAL | SDJF_USER; + source = NULL; + } + else if(strcmp(source, SDJF_SOURCE_LOCAL_OTHER_NAME) == 0) { + source_type = SDJF_LOCAL | SDJF_OTHER; + source = NULL; + } + else { + source_type = SDJF_ALL; + // else, match the source, whatever it is + } + } + else if(strncmp(keyword, JOURNAL_PARAMETER_AFTER ":", sizeof(JOURNAL_PARAMETER_AFTER ":") - 1) == 0) { + after_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_AFTER ":") - 1]); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_BEFORE ":", sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1) == 0) { + before_s = str2l(&keyword[sizeof(JOURNAL_PARAMETER_BEFORE ":") - 1]); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":", sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1) == 0) { + if_modified_since = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_IF_MODIFIED_SINCE ":") - 1], NULL); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_ANCHOR ":", sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1) == 0) { + anchor = str2ull(&keyword[sizeof(JOURNAL_PARAMETER_ANCHOR ":") - 1], NULL); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_DIRECTION ":", sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1) == 0) { + direction = strcasecmp(&keyword[sizeof(JOURNAL_PARAMETER_DIRECTION ":") - 1], "forward") == 0 ? FACETS_ANCHOR_DIRECTION_FORWARD : FACETS_ANCHOR_DIRECTION_BACKWARD; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_LAST ":", sizeof(JOURNAL_PARAMETER_LAST ":") - 1) == 0) { + last = str2ul(&keyword[sizeof(JOURNAL_PARAMETER_LAST ":") - 1]); + } + else if(strncmp(keyword, JOURNAL_PARAMETER_QUERY ":", sizeof(JOURNAL_PARAMETER_QUERY ":") - 1) == 0) { + query= &keyword[sizeof(JOURNAL_PARAMETER_QUERY ":") - 1]; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_HISTOGRAM ":", sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1) == 0) { + chart = &keyword[sizeof(JOURNAL_PARAMETER_HISTOGRAM ":") - 1]; + } + else if(strncmp(keyword, JOURNAL_PARAMETER_FACETS ":", sizeof(JOURNAL_PARAMETER_FACETS ":") - 1) == 0) { + char *value = &keyword[sizeof(JOURNAL_PARAMETER_FACETS ":") - 1]; + if(*value) { + buffer_json_member_add_array(wb, JOURNAL_PARAMETER_FACETS); + + while(value) { + char *sep = strchr(value, ','); + if(sep) + *sep++ = '\0'; + + facets_register_facet_id(facets, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER); + buffer_json_add_array_item_string(wb, value); + + value = sep; + } + + buffer_json_array_close(wb); // JOURNAL_PARAMETER_FACETS + } } else { char *value = strchr(keyword, ':'); @@ -412,8 +2437,9 @@ static void function_systemd_journal(const char *transaction, char *function, ch if(sep) *sep++ = '\0'; - facets_register_facet_filter(facets, keyword, value, FACET_KEY_OPTION_REORDER); + facets_register_facet_id_filter(facets, keyword, value, FACET_KEY_OPTION_FACET|FACET_KEY_OPTION_FTS|FACET_KEY_OPTION_REORDER); buffer_json_add_array_item_string(wb, value); + filters++; value = sep; } @@ -423,18 +2449,31 @@ static void function_systemd_journal(const char *transaction, char *function, ch } } - buffer_json_object_close(wb); // filters + // ------------------------------------------------------------------------ + // put this request into the progress db + + if(progress_id && *progress_id) { + fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs)); + fqs = dictionary_acquired_item_value(fqs_item); + } + else { + // no progress id given, proceed without registering our progress in the dictionary + fqs = &tmp_fqs; + fqs_item = NULL; + } + + // ------------------------------------------------------------------------ + // validate parameters - time_t expires = now_realtime_sec() + 1; - time_t now_s; + time_t now_s = now_realtime_sec(); + time_t expires = now_s + 1; if(!after_s && !before_s) { - now_s = now_realtime_sec(); before_s = now_s; after_s = before_s - SYSTEMD_JOURNAL_DEFAULT_QUERY_DURATION; } else - rrdr_relative_window_to_absolute(&after_s, &before_s, &now_s, false); + rrdr_relative_window_to_absolute(&after_s, &before_s, now_s); if(after_s > before_s) { time_t tmp = after_s; @@ -448,85 +2487,179 @@ static void function_systemd_journal(const char *transaction, char *function, ch if(!last) last = SYSTEMD_JOURNAL_DEFAULT_ITEMS_PER_QUERY; - buffer_json_member_add_time_t(wb, "after", after_s); - buffer_json_member_add_time_t(wb, "before", before_s); - buffer_json_member_add_uint64(wb, "anchor", anchor); - buffer_json_member_add_uint64(wb, "last", last); - buffer_json_member_add_string(wb, "query", query); - buffer_json_member_add_time_t(wb, "timeout", timeout); - buffer_json_object_close(wb); // request - - facets_set_items(facets, last); - facets_set_anchor(facets, anchor); - facets_set_query(facets, query); - int response = systemd_journal_query(wb, facets, after_s * USEC_PER_SEC, before_s * USEC_PER_SEC, - now_monotonic_usec() + (timeout - 1) * USEC_PER_SEC); - if(response != HTTP_RESP_OK) { - pluginsd_function_json_error(transaction, response, "failed"); - goto cleanup; + // ------------------------------------------------------------------------ + // set query time-frame, anchors and direction + + fqs->after_ut = after_s * USEC_PER_SEC; + fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1; + fqs->if_modified_since = if_modified_since; + fqs->data_only = data_only; + fqs->delta = (fqs->data_only) ? delta : false; + fqs->tail = (fqs->data_only && fqs->if_modified_since) ? tail : false; + fqs->source = string_strdupz(source); + fqs->source_type = source_type; + fqs->entries = last; + fqs->last_modified = 0; + fqs->filters = filters; + fqs->query = (query && *query) ? query : NULL; + fqs->histogram = (chart && *chart) ? chart : NULL; + fqs->direction = direction; + fqs->anchor.start_ut = anchor; + fqs->anchor.stop_ut = 0; + + if(fqs->anchor.start_ut && fqs->tail) { + // a tail request + // we need the top X entries from BEFORE + // but, we need to calculate the facets and the + // histogram up to the anchor + fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD; + fqs->anchor.start_ut = 0; + fqs->anchor.stop_ut = anchor; } - pluginsd_function_result_begin_to_stdout(transaction, HTTP_RESP_OK, "application/json", expires); - fwrite(buffer_tostring(wb), buffer_strlen(wb), 1, stdout); + if(anchor && anchor < fqs->after_ut) { + log_fqs(fqs, "received anchor is too small for query timeframe, ignoring anchor"); + anchor = 0; + fqs->anchor.start_ut = 0; + fqs->anchor.stop_ut = 0; + fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD; + } + else if(anchor > fqs->before_ut) { + log_fqs(fqs, "received anchor is too big for query timeframe, ignoring anchor"); + anchor = 0; + fqs->anchor.start_ut = 0; + fqs->anchor.stop_ut = 0; + fqs->direction = direction = FACETS_ANCHOR_DIRECTION_BACKWARD; + } - pluginsd_function_result_end_to_stdout(); + facets_set_anchor(facets, fqs->anchor.start_ut, fqs->anchor.stop_ut, fqs->direction); -cleanup: - facets_destroy(facets); - buffer_free(wb); -} + facets_set_additional_options(facets, + ((fqs->data_only) ? FACETS_OPTION_DATA_ONLY : 0) | + ((fqs->delta) ? FACETS_OPTION_SHOW_DELTAS : 0)); -static void *reader_main(void *arg __maybe_unused) { - char buffer[PLUGINSD_LINE_MAX + 1]; + // ------------------------------------------------------------------------ + // set the rest of the query parameters - char *s = NULL; - while(!plugin_should_exit && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, PLUGINSD_MAX_WORDS); + facets_set_items(facets, fqs->entries); + facets_set_query(facets, fqs->query); - const char *keyword = get_word(words, num_words, 0); +#ifdef HAVE_SD_JOURNAL_RESTART_FIELDS + fqs->slice = slice; + if(slice) + facets_enable_slice_mode(facets); +#else + fqs->slice = false; +#endif - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + if(fqs->histogram) + facets_set_timeframe_and_histogram_by_id(facets, fqs->histogram, fqs->after_ut, fqs->before_ut); + else + facets_set_timeframe_and_histogram_by_name(facets, "PRIORITY", fqs->after_ut, fqs->before_ut); - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - keyword, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - if(timeout <= 0) timeout = SYSTEMD_JOURNAL_DEFAULT_TIMEOUT; - netdata_mutex_lock(&mutex); + // ------------------------------------------------------------------------ + // complete the request object + + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta); + buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_SOURCE, string2str(fqs->source)); + buffer_json_member_add_uint64(wb, "source_type", fqs->source_type); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC); + buffer_json_member_add_uint64(wb, "if_modified_since", fqs->if_modified_since); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_ANCHOR, anchor); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_DIRECTION, fqs->direction == FACETS_ANCHOR_DIRECTION_FORWARD ? "forward" : "backward"); + buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_LAST, fqs->entries); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_QUERY, fqs->query); + buffer_json_member_add_string(wb, JOURNAL_PARAMETER_HISTOGRAM, fqs->histogram); + buffer_json_object_close(wb); // request - if(strncmp(function, SYSTEMD_JOURNAL_FUNCTION_NAME, strlen(SYSTEMD_JOURNAL_FUNCTION_NAME)) == 0) - function_systemd_journal(transaction, function, buffer, PLUGINSD_LINE_MAX + 1, timeout); - else - pluginsd_function_json_error(transaction, HTTP_RESP_NOT_FOUND, "No function with this name found in systemd-journal.plugin."); + buffer_json_journal_versions(wb); - fflush(stdout); - netdata_mutex_unlock(&mutex); + // ------------------------------------------------------------------------ + // run the request + + int response; + + if(info) { + facets_accepted_parameters_to_json_array(facets, wb, false); + buffer_json_member_add_array(wb, "required_params"); + { + buffer_json_add_array_item_object(wb); + { + buffer_json_member_add_string(wb, "id", "source"); + buffer_json_member_add_string(wb, "name", "source"); + buffer_json_member_add_string(wb, "help", "Select the SystemD Journal source to query"); + buffer_json_member_add_string(wb, "type", "select"); + buffer_json_member_add_array(wb, "options"); + { + available_journal_file_sources_to_json_array(wb); + } + buffer_json_array_close(wb); // options array } + buffer_json_object_close(wb); // required params object } - else - netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); + buffer_json_array_close(wb); // required_params array + + facets_table_config(wb); + + buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); + buffer_json_member_add_string(wb, "type", "table"); + buffer_json_member_add_string(wb, "help", SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); + buffer_json_finalize(wb); + response = HTTP_RESP_OK; + goto output; } - if(!s || feof(stdin) || ferror(stdin)) { - plugin_should_exit = true; - netdata_log_error("Received error on stdin."); + if(progress) { + function_systemd_journal_progress(wb, transaction, progress_id); + goto cleanup; } - exit(1); + response = netdata_systemd_journal_query(wb, facets, fqs); + + // ------------------------------------------------------------------------ + // cleanup query params + + string_freez(fqs->source); + fqs->source = NULL; + + // ------------------------------------------------------------------------ + // handle error response + + if(response != HTTP_RESP_OK) { + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, response, "failed"); + netdata_mutex_unlock(&stdout_mutex); + goto cleanup; + } + +output: + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_result_to_stdout(transaction, response, "application/json", expires, wb); + netdata_mutex_unlock(&stdout_mutex); + +cleanup: + facets_destroy(facets); + buffer_free(wb); + + if(fqs_item) { + dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item)); + dictionary_acquired_item_release(function_query_status_dict, fqs_item); + dictionary_garbage_collect(function_query_status_dict); + } } +// ---------------------------------------------------------------------------- + int main(int argc __maybe_unused, char **argv __maybe_unused) { stderror = stderr; clocks_init(); @@ -540,44 +2673,104 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { error_log_errors_per_period = 100; error_log_throttle_period = 3600; - // initialize the threads - netdata_threads_init_for_external_plugins(0); // set the default threads stack size here + log_set_global_severity_for_external_plugins(); + + netdata_configured_host_prefix = getenv("NETDATA_HOST_PREFIX"); + if(verify_netdata_host_prefix() == -1) exit(1); + + // ------------------------------------------------------------------------ + // setup the journal directories + + unsigned d = 0; + + journal_directories[d++].path = strdupz("/var/log/journal"); + journal_directories[d++].path = strdupz("/run/log/journal"); + + if(*netdata_configured_host_prefix) { + char path[PATH_MAX]; + snprintfz(path, sizeof(path), "%s/var/log/journal", netdata_configured_host_prefix); + journal_directories[d++].path = strdupz(path); + snprintfz(path, sizeof(path), "%s/run/log/journal", netdata_configured_host_prefix); + journal_directories[d++].path = strdupz(path); + } + + // terminate the list + journal_directories[d].path = NULL; + + // ------------------------------------------------------------------------ + + function_query_status_dict = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(FUNCTION_QUERY_STATUS)); + + // ------------------------------------------------------------------------ + // initialize the used hashes files registry + + used_hashes_registry = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + + + // ------------------------------------------------------------------------ + // initialize the journal files registry + + systemd_journal_session = (now_realtime_usec() / USEC_PER_SEC) * USEC_PER_SEC; + + journal_files_registry = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(struct journal_file)); + + dictionary_register_insert_callback(journal_files_registry, files_registry_insert_cb, NULL); + dictionary_register_delete_callback(journal_files_registry, files_registry_delete_cb, NULL); + dictionary_register_conflict_callback(journal_files_registry, files_registry_conflict_cb, NULL); + + boot_ids_to_first_ut = dictionary_create_advanced( + DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, + NULL, sizeof(usec_t)); + + journal_files_registry_update(); - uids = dictionary_create(0); - gids = dictionary_create(0); // ------------------------------------------------------------------------ // debug if(argc == 2 && strcmp(argv[1], "debug") == 0) { - char buf[] = "systemd-journal after:-86400 before:0 last:500"; - function_systemd_journal("123", buf, "", 0, 30); + bool cancelled = false; + char buf[] = "systemd-journal after:-16000000 before:0 last:1"; + // char buf[] = "systemd-journal after:1695332964 before:1695937764 direction:backward last:100 slice:true source:all DHKucpqUoe1:PtVoyIuX.MU"; + // char buf[] = "systemd-journal after:1694511062 before:1694514662 anchor:1694514122024403"; + function_systemd_journal("123", buf, 600, &cancelled); exit(1); } // ------------------------------------------------------------------------ + // the event loop for functions + + struct functions_evloop_globals *wg = + functions_evloop_init(SYSTEMD_JOURNAL_WORKER_THREADS, "SDJ", &stdout_mutex, &plugin_should_exit); + + functions_evloop_add_function(wg, SYSTEMD_JOURNAL_FUNCTION_NAME, function_systemd_journal, + SYSTEMD_JOURNAL_DEFAULT_TIMEOUT); - netdata_thread_t reader_thread; - netdata_thread_create(&reader_thread, "SDJ_READER", NETDATA_THREAD_OPTION_DONT_LOG, reader_main, NULL); // ------------------------------------------------------------------------ time_t started_t = now_monotonic_sec(); - size_t iteration; + size_t iteration = 0; usec_t step = 1000 * USEC_PER_MS; bool tty = isatty(fileno(stderr)) == 1; - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION " GLOBAL \"%s\" %d \"%s\"\n", SYSTEMD_JOURNAL_FUNCTION_NAME, SYSTEMD_JOURNAL_DEFAULT_TIMEOUT, SYSTEMD_JOURNAL_FUNCTION_DESCRIPTION); heartbeat_t hb; heartbeat_init(&hb); - for(iteration = 0; 1 ; iteration++) { - netdata_mutex_unlock(&mutex); + while(!plugin_should_exit) { + iteration++; + + netdata_mutex_unlock(&stdout_mutex); heartbeat_next(&hb, step); - netdata_mutex_lock(&mutex); + netdata_mutex_lock(&stdout_mutex); if(!tty) fprintf(stdout, "\n"); @@ -589,8 +2782,5 @@ int main(int argc __maybe_unused, char **argv __maybe_unused) { break; } - dictionary_destroy(uids); - dictionary_destroy(gids); - exit(0); } |