summaryrefslogtreecommitdiffstats
path: root/src/collectors/systemd-journal.plugin/systemd-journal.c
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/collectors/systemd-journal.plugin/systemd-journal.c (renamed from collectors/systemd-journal.plugin/systemd-journal.c)157
1 files changed, 28 insertions, 129 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/src/collectors/systemd-journal.plugin/systemd-journal.c
index f812b2161..84f644575 100644
--- a/collectors/systemd-journal.plugin/systemd-journal.c
+++ b/src/collectors/systemd-journal.plugin/systemd-journal.c
@@ -26,6 +26,8 @@
#define SYSTEMD_JOURNAL_SAMPLING_SLOTS 1000
#define SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE 10000
+#define SYSTEMD_JOURNAL_PROGRESS_EVERY_UT (250 * USEC_PER_MS)
+
#define JOURNAL_PARAMETER_HELP "help"
#define JOURNAL_PARAMETER_AFTER "after"
#define JOURNAL_PARAMETER_BEFORE "before"
@@ -39,8 +41,6 @@
#define JOURNAL_PARAMETER_DATA_ONLY "data_only"
#define JOURNAL_PARAMETER_SOURCE "source"
#define JOURNAL_PARAMETER_INFO "info"
-#define JOURNAL_PARAMETER_ID "id"
-#define JOURNAL_PARAMETER_PROGRESS "progress"
#define JOURNAL_PARAMETER_SLICE "slice"
#define JOURNAL_PARAMETER_DELTA "delta"
#define JOURNAL_PARAMETER_TAIL "tail"
@@ -183,11 +183,11 @@
typedef struct function_query_status {
bool *cancelled; // a pointer to the cancelling boolean
- usec_t stop_monotonic_ut;
-
- usec_t started_monotonic_ut;
+ usec_t *stop_monotonic_ut;
// request
+ const char *transaction;
+
SD_JOURNAL_FILE_SOURCE_TYPE source_type;
SIMPLE_PATTERN *sources;
usec_t after_ut;
@@ -518,12 +518,12 @@ static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCT
return remaining_logs_by_time;
}
-static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
- size_t expected_matching_logs_by_seqnum = 0;
- double proportion_by_seqnum = 0.0;
+static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j __maybe_unused, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) {
size_t remaining_logs_by_seqnum = 0;
#ifdef HAVE_SD_JOURNAL_GET_SEQNUM
+ size_t expected_matching_logs_by_seqnum = 0;
+ double proportion_by_seqnum = 0.0;
uint64_t current_msg_seqnum;
sd_id128_t current_msg_writer;
if(!fqs->query_file.first_msg_seqnum || sd_journal_get_seqnum(j, &current_msg_seqnum, &current_msg_writer) < 0) {
@@ -807,7 +807,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward(
FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
last_bytes = bytes;
- status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ status = check_stop(fqs->cancelled, fqs->stop_monotonic_ut);
}
}
else if(sample == SAMPLING_SKIP_FIELDS)
@@ -914,7 +914,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward(
FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes);
last_bytes = bytes;
- status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut);
+ status = check_stop(fqs->cancelled, fqs->stop_monotonic_ut);
}
}
else if(sample == SAMPLING_SKIP_FIELDS)
@@ -1085,7 +1085,7 @@ static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) {
if((fqs->source_type == SDJF_NONE && !fqs->sources) || (jf->source_type & fqs->source_type) ||
(fqs->sources && simple_pattern_matches(fqs->sources, string2str(jf->source)))) {
- if(!jf->msg_last_ut || !jf->msg_last_ut)
+ if(!jf->msg_last_ut)
// the file is not scanned yet, or the timestamps have not been updated,
// so we don't know if it can contribute or not - let's add it.
return true;
@@ -1150,6 +1150,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
usec_t started_ut = query_started_ut;
usec_t ended_ut = started_ut;
usec_t duration_ut = 0, max_duration_ut = 0;
+ usec_t progress_duration_ut = 0;
sampling_query_init(fqs, facets);
@@ -1164,9 +1165,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
started_ut = ended_ut;
// do not even try to do the query if we expect it to pass the timeout
- if(ended_ut > (query_started_ut + (fqs->stop_monotonic_ut - query_started_ut) * 3 / 4) &&
- ended_ut + max_duration_ut * 2 >= fqs->stop_monotonic_ut) {
-
+ if(ended_ut + max_duration_ut * 3 >= *fqs->stop_monotonic_ut) {
partial = true;
status = ND_SD_JOURNAL_TIMED_OUT;
break;
@@ -1211,6 +1210,14 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU
if(duration_ut > max_duration_ut)
max_duration_ut = duration_ut;
+ progress_duration_ut += duration_ut;
+ if(progress_duration_ut >= SYSTEMD_JOURNAL_PROGRESS_EVERY_UT) {
+ progress_duration_ut = 0;
+ netdata_mutex_lock(&stdout_mutex);
+ pluginsd_function_progress_to_stdout(fqs->transaction, f + 1, files_used);
+ netdata_mutex_unlock(&stdout_mutex);
+ }
+
buffer_json_add_array_item_object(wb); // journal file
{
// information about the file
@@ -1422,15 +1429,6 @@ static void netdata_systemd_journal_function_help(const char *transaction) {
" all the available systemd journal sources.\n"
" When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n"
"\n"
- " "JOURNAL_PARAMETER_ID":STRING\n"
- " Caller supplied unique ID of the request.\n"
- " This can be used later to request a progress report of the query.\n"
- " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n"
- "\n"
- " "JOURNAL_PARAMETER_PROGRESS"\n"
- " Request a progress report (the `id` of a running query is required).\n"
- " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n"
- "\n"
" "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n"
" Quickly respond with data requested, without generating a\n"
" `histogram`, `facets` counters and `items`.\n"
@@ -1522,67 +1520,9 @@ static void netdata_systemd_journal_function_help(const char *transaction) {
buffer_free(wb);
}
-DICTIONARY *function_query_status_dict = NULL;
-
-static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) {
- if(!progress_id || !(*progress_id)) {
- netdata_mutex_lock(&stdout_mutex);
- pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id");
- netdata_mutex_unlock(&stdout_mutex);
- return;
- }
-
- const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id);
-
- if(!item) {
- netdata_mutex_lock(&stdout_mutex);
- pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here");
- netdata_mutex_unlock(&stdout_mutex);
- return;
- }
-
- FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item);
-
- usec_t now_monotonic_ut = now_monotonic_usec();
- if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut)
- fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC;
-
- usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut;
-
- size_t files_matched = fqs->files_matched;
- size_t file_working = fqs->file_working;
- if(file_working > files_matched)
- files_matched = file_working;
-
- size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED);
- size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED);
-
- buffer_flush(wb);
- buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
- buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK);
- buffer_json_member_add_string(wb, "type", "table");
- buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut);
- buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched);
- char msg[1024 + 1];
- snprintfz(msg, sizeof(msg) - 1,
- "Read %zu rows (%0.0f rows/s), "
- "data %0.1f MB (%0.1f MB/s), "
- "file %zu of %zu",
- rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC,
- (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0,
- file_working, files_matched
- );
- buffer_json_member_add_string(wb, "message", msg);
- buffer_json_finalize(wb);
-
- netdata_mutex_lock(&stdout_mutex);
- pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb);
- netdata_mutex_unlock(&stdout_mutex);
-
- dictionary_acquired_item_release(function_query_status_dict, item);
-}
-
-void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) {
+void function_systemd_journal(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled,
+ BUFFER *payload __maybe_unused, HTTP_ACCESS access __maybe_unused,
+ const char *source __maybe_unused, void *data __maybe_unused) {
fstat_thread_calls = 0;
fstat_thread_cached_responses = 0;
@@ -1590,14 +1530,11 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
buffer_flush(wb);
buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY);
- usec_t now_monotonic_ut = now_monotonic_usec();
FUNCTION_QUERY_STATUS tmp_fqs = {
.cancelled = cancelled,
- .started_monotonic_ut = now_monotonic_ut,
- .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC),
+ .stop_monotonic_ut = stop_monotonic_ut,
};
FUNCTION_QUERY_STATUS *fqs = NULL;
- const DICTIONARY_ITEM *fqs_item = NULL;
FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS,
SYSTEMD_ALWAYS_VISIBLE_KEYS,
@@ -1616,8 +1553,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM);
facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE);
facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY);
- facets_accepted_param(facets, JOURNAL_PARAMETER_ID);
- facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS);
facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA);
facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL);
facets_accepted_param(facets, JOURNAL_PARAMETER_SAMPLING);
@@ -1724,7 +1659,7 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
// ------------------------------------------------------------------------
// parse the parameters
- bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false;
+ bool info = false, data_only = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false;
time_t after_s = 0, before_s = 0;
usec_t anchor = 0;
usec_t if_modified_since = 0;
@@ -1733,7 +1668,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
const char *query = NULL;
const char *chart = NULL;
SIMPLE_PATTERN *sources = NULL;
- const char *progress_id = NULL;
SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL;
size_t filters = 0;
size_t sampling = SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING;
@@ -1753,9 +1687,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) {
info = true;
}
- else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) {
- progress = true;
- }
else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) {
char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1];
@@ -1791,12 +1722,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
else
slice = true;
}
- else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) {
- char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1];
-
- if(*id)
- progress_id = id;
- }
else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) {
const char *value = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1];
@@ -1930,15 +1855,7 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
// ------------------------------------------------------------------------
// put this request into the progress db
- if(progress_id && *progress_id) {
- fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs));
- fqs = dictionary_acquired_item_value(fqs_item);
- }
- else {
- // no progress id given, proceed without registering our progress in the dictionary
- fqs = &tmp_fqs;
- fqs_item = NULL;
- }
+ fqs = &tmp_fqs;
// ------------------------------------------------------------------------
// validate parameters
@@ -1969,6 +1886,7 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
// ------------------------------------------------------------------------
// set query time-frame, anchors and direction
+ fqs->transaction = transaction;
fqs->after_ut = after_s * USEC_PER_SEC;
fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1;
fqs->if_modified_since = if_modified_since;
@@ -2045,11 +1963,9 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false);
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice);
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only);
- buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false);
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta);
buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail);
buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_SAMPLING, fqs->sampling);
- buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id);
buffer_json_member_add_uint64(wb, "source_type", fqs->source_type);
buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC);
buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC);
@@ -2098,11 +2014,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo
goto output;
}
- if(progress) {
- function_systemd_journal_progress(wb, transaction, progress_id);
- goto cleanup;
- }
-
response = netdata_systemd_journal_query(wb, facets, fqs);
// ------------------------------------------------------------------------
@@ -2124,16 +2035,4 @@ cleanup:
simple_pattern_free(sources);
facets_destroy(facets);
buffer_free(wb);
-
- if(fqs_item) {
- dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item));
- dictionary_acquired_item_release(function_query_status_dict, fqs_item);
- dictionary_garbage_collect(function_query_status_dict);
- }
-}
-
-void journal_init_query_status(void) {
- function_query_status_dict = dictionary_create_advanced(
- DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
- NULL, sizeof(FUNCTION_QUERY_STATUS));
}