diff options
Diffstat (limited to '')
-rw-r--r-- | src/collectors/systemd-journal.plugin/systemd-journal.c (renamed from collectors/systemd-journal.plugin/systemd-journal.c) | 157 |
1 files changed, 28 insertions, 129 deletions
diff --git a/collectors/systemd-journal.plugin/systemd-journal.c b/src/collectors/systemd-journal.plugin/systemd-journal.c index f812b2161..84f644575 100644 --- a/collectors/systemd-journal.plugin/systemd-journal.c +++ b/src/collectors/systemd-journal.plugin/systemd-journal.c @@ -26,6 +26,8 @@ #define SYSTEMD_JOURNAL_SAMPLING_SLOTS 1000 #define SYSTEMD_JOURNAL_SAMPLING_RECALIBRATE 10000 +#define SYSTEMD_JOURNAL_PROGRESS_EVERY_UT (250 * USEC_PER_MS) + #define JOURNAL_PARAMETER_HELP "help" #define JOURNAL_PARAMETER_AFTER "after" #define JOURNAL_PARAMETER_BEFORE "before" @@ -39,8 +41,6 @@ #define JOURNAL_PARAMETER_DATA_ONLY "data_only" #define JOURNAL_PARAMETER_SOURCE "source" #define JOURNAL_PARAMETER_INFO "info" -#define JOURNAL_PARAMETER_ID "id" -#define JOURNAL_PARAMETER_PROGRESS "progress" #define JOURNAL_PARAMETER_SLICE "slice" #define JOURNAL_PARAMETER_DELTA "delta" #define JOURNAL_PARAMETER_TAIL "tail" @@ -183,11 +183,11 @@ typedef struct function_query_status { bool *cancelled; // a pointer to the cancelling boolean - usec_t stop_monotonic_ut; - - usec_t started_monotonic_ut; + usec_t *stop_monotonic_ut; // request + const char *transaction; + SD_JOURNAL_FILE_SOURCE_TYPE source_type; SIMPLE_PATTERN *sources; usec_t after_ut; @@ -518,12 +518,12 @@ static size_t sampling_running_file_query_estimate_remaining_lines_by_time(FUNCT return remaining_logs_by_time; } -static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { - size_t expected_matching_logs_by_seqnum = 0; - double proportion_by_seqnum = 0.0; +static size_t sampling_running_file_query_estimate_remaining_lines(sd_journal *j __maybe_unused, FUNCTION_QUERY_STATUS *fqs, struct journal_file *jf, FACETS_ANCHOR_DIRECTION direction, usec_t msg_ut) { size_t remaining_logs_by_seqnum = 0; #ifdef HAVE_SD_JOURNAL_GET_SEQNUM + size_t expected_matching_logs_by_seqnum = 0; + double proportion_by_seqnum = 0.0; uint64_t current_msg_seqnum; sd_id128_t current_msg_writer; if(!fqs->query_file.first_msg_seqnum || sd_journal_get_seqnum(j, ¤t_msg_seqnum, ¤t_msg_writer) < 0) { @@ -807,7 +807,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_backward( FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); last_bytes = bytes; - status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + status = check_stop(fqs->cancelled, fqs->stop_monotonic_ut); } } else if(sample == SAMPLING_SKIP_FIELDS) @@ -914,7 +914,7 @@ ND_SD_JOURNAL_STATUS netdata_systemd_journal_query_forward( FUNCTION_PROGRESS_UPDATE_BYTES(fqs->bytes_read, bytes - last_bytes); last_bytes = bytes; - status = check_stop(fqs->cancelled, &fqs->stop_monotonic_ut); + status = check_stop(fqs->cancelled, fqs->stop_monotonic_ut); } } else if(sample == SAMPLING_SKIP_FIELDS) @@ -1085,7 +1085,7 @@ static bool jf_is_mine(struct journal_file *jf, FUNCTION_QUERY_STATUS *fqs) { if((fqs->source_type == SDJF_NONE && !fqs->sources) || (jf->source_type & fqs->source_type) || (fqs->sources && simple_pattern_matches(fqs->sources, string2str(jf->source)))) { - if(!jf->msg_last_ut || !jf->msg_last_ut) + if(!jf->msg_last_ut) // the file is not scanned yet, or the timestamps have not been updated, // so we don't know if it can contribute or not - let's add it. return true; @@ -1150,6 +1150,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU usec_t started_ut = query_started_ut; usec_t ended_ut = started_ut; usec_t duration_ut = 0, max_duration_ut = 0; + usec_t progress_duration_ut = 0; sampling_query_init(fqs, facets); @@ -1164,9 +1165,7 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU started_ut = ended_ut; // do not even try to do the query if we expect it to pass the timeout - if(ended_ut > (query_started_ut + (fqs->stop_monotonic_ut - query_started_ut) * 3 / 4) && - ended_ut + max_duration_ut * 2 >= fqs->stop_monotonic_ut) { - + if(ended_ut + max_duration_ut * 3 >= *fqs->stop_monotonic_ut) { partial = true; status = ND_SD_JOURNAL_TIMED_OUT; break; @@ -1211,6 +1210,14 @@ static int netdata_systemd_journal_query(BUFFER *wb, FACETS *facets, FUNCTION_QU if(duration_ut > max_duration_ut) max_duration_ut = duration_ut; + progress_duration_ut += duration_ut; + if(progress_duration_ut >= SYSTEMD_JOURNAL_PROGRESS_EVERY_UT) { + progress_duration_ut = 0; + netdata_mutex_lock(&stdout_mutex); + pluginsd_function_progress_to_stdout(fqs->transaction, f + 1, files_used); + netdata_mutex_unlock(&stdout_mutex); + } + buffer_json_add_array_item_object(wb); // journal file { // information about the file @@ -1422,15 +1429,6 @@ static void netdata_systemd_journal_function_help(const char *transaction) { " all the available systemd journal sources.\n" " When `"JOURNAL_PARAMETER_INFO"` is requested, all other parameters are ignored.\n" "\n" - " "JOURNAL_PARAMETER_ID":STRING\n" - " Caller supplied unique ID of the request.\n" - " This can be used later to request a progress report of the query.\n" - " Optional, but if omitted no `"JOURNAL_PARAMETER_PROGRESS"` can be requested.\n" - "\n" - " "JOURNAL_PARAMETER_PROGRESS"\n" - " Request a progress report (the `id` of a running query is required).\n" - " When `"JOURNAL_PARAMETER_PROGRESS"` is requested, only parameter `"JOURNAL_PARAMETER_ID"` is used.\n" - "\n" " "JOURNAL_PARAMETER_DATA_ONLY":true or "JOURNAL_PARAMETER_DATA_ONLY":false\n" " Quickly respond with data requested, without generating a\n" " `histogram`, `facets` counters and `items`.\n" @@ -1522,67 +1520,9 @@ static void netdata_systemd_journal_function_help(const char *transaction) { buffer_free(wb); } -DICTIONARY *function_query_status_dict = NULL; - -static void function_systemd_journal_progress(BUFFER *wb, const char *transaction, const char *progress_id) { - if(!progress_id || !(*progress_id)) { - netdata_mutex_lock(&stdout_mutex); - pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_BAD_REQUEST, "missing progress id"); - netdata_mutex_unlock(&stdout_mutex); - return; - } - - const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(function_query_status_dict, progress_id); - - if(!item) { - netdata_mutex_lock(&stdout_mutex); - pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, "progress id is not found here"); - netdata_mutex_unlock(&stdout_mutex); - return; - } - - FUNCTION_QUERY_STATUS *fqs = dictionary_acquired_item_value(item); - - usec_t now_monotonic_ut = now_monotonic_usec(); - if(now_monotonic_ut + 10 * USEC_PER_SEC > fqs->stop_monotonic_ut) - fqs->stop_monotonic_ut = now_monotonic_ut + 10 * USEC_PER_SEC; - - usec_t duration_ut = now_monotonic_ut - fqs->started_monotonic_ut; - - size_t files_matched = fqs->files_matched; - size_t file_working = fqs->file_working; - if(file_working > files_matched) - files_matched = file_working; - - size_t rows_read = __atomic_load_n(&fqs->rows_read, __ATOMIC_RELAXED); - size_t bytes_read = __atomic_load_n(&fqs->bytes_read, __ATOMIC_RELAXED); - - buffer_flush(wb); - buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); - buffer_json_member_add_uint64(wb, "status", HTTP_RESP_OK); - buffer_json_member_add_string(wb, "type", "table"); - buffer_json_member_add_uint64(wb, "running_duration_usec", duration_ut); - buffer_json_member_add_double(wb, "progress", (double)file_working * 100.0 / (double)files_matched); - char msg[1024 + 1]; - snprintfz(msg, sizeof(msg) - 1, - "Read %zu rows (%0.0f rows/s), " - "data %0.1f MB (%0.1f MB/s), " - "file %zu of %zu", - rows_read, (double)rows_read / (double)duration_ut * (double)USEC_PER_SEC, - (double)bytes_read / 1024.0 / 1024.0, ((double)bytes_read / (double)duration_ut * (double)USEC_PER_SEC) / 1024.0 / 1024.0, - file_working, files_matched - ); - buffer_json_member_add_string(wb, "message", msg); - buffer_json_finalize(wb); - - netdata_mutex_lock(&stdout_mutex); - pluginsd_function_result_to_stdout(transaction, HTTP_RESP_OK, "application/json", now_realtime_sec() + 1, wb); - netdata_mutex_unlock(&stdout_mutex); - - dictionary_acquired_item_release(function_query_status_dict, item); -} - -void function_systemd_journal(const char *transaction, char *function, int timeout, bool *cancelled) { +void function_systemd_journal(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, + BUFFER *payload __maybe_unused, HTTP_ACCESS access __maybe_unused, + const char *source __maybe_unused, void *data __maybe_unused) { fstat_thread_calls = 0; fstat_thread_cached_responses = 0; @@ -1590,14 +1530,11 @@ void function_systemd_journal(const char *transaction, char *function, int timeo buffer_flush(wb); buffer_json_initialize(wb, "\"", "\"", 0, true, BUFFER_JSON_OPTIONS_MINIFY); - usec_t now_monotonic_ut = now_monotonic_usec(); FUNCTION_QUERY_STATUS tmp_fqs = { .cancelled = cancelled, - .started_monotonic_ut = now_monotonic_ut, - .stop_monotonic_ut = now_monotonic_ut + (timeout * USEC_PER_SEC), + .stop_monotonic_ut = stop_monotonic_ut, }; FUNCTION_QUERY_STATUS *fqs = NULL; - const DICTIONARY_ITEM *fqs_item = NULL; FACETS *facets = facets_create(50, FACETS_OPTION_ALL_KEYS_FTS, SYSTEMD_ALWAYS_VISIBLE_KEYS, @@ -1616,8 +1553,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo facets_accepted_param(facets, JOURNAL_PARAMETER_HISTOGRAM); facets_accepted_param(facets, JOURNAL_PARAMETER_IF_MODIFIED_SINCE); facets_accepted_param(facets, JOURNAL_PARAMETER_DATA_ONLY); - facets_accepted_param(facets, JOURNAL_PARAMETER_ID); - facets_accepted_param(facets, JOURNAL_PARAMETER_PROGRESS); facets_accepted_param(facets, JOURNAL_PARAMETER_DELTA); facets_accepted_param(facets, JOURNAL_PARAMETER_TAIL); facets_accepted_param(facets, JOURNAL_PARAMETER_SAMPLING); @@ -1724,7 +1659,7 @@ void function_systemd_journal(const char *transaction, char *function, int timeo // ------------------------------------------------------------------------ // parse the parameters - bool info = false, data_only = false, progress = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false; + bool info = false, data_only = false, slice = JOURNAL_DEFAULT_SLICE_MODE, delta = false, tail = false; time_t after_s = 0, before_s = 0; usec_t anchor = 0; usec_t if_modified_since = 0; @@ -1733,7 +1668,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo const char *query = NULL; const char *chart = NULL; SIMPLE_PATTERN *sources = NULL; - const char *progress_id = NULL; SD_JOURNAL_FILE_SOURCE_TYPE source_type = SDJF_ALL; size_t filters = 0; size_t sampling = SYSTEMD_JOURNAL_DEFAULT_ITEMS_SAMPLING; @@ -1753,9 +1687,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo else if(strcmp(keyword, JOURNAL_PARAMETER_INFO) == 0) { info = true; } - else if(strcmp(keyword, JOURNAL_PARAMETER_PROGRESS) == 0) { - progress = true; - } else if(strncmp(keyword, JOURNAL_PARAMETER_DELTA ":", sizeof(JOURNAL_PARAMETER_DELTA ":") - 1) == 0) { char *v = &keyword[sizeof(JOURNAL_PARAMETER_DELTA ":") - 1]; @@ -1791,12 +1722,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo else slice = true; } - else if(strncmp(keyword, JOURNAL_PARAMETER_ID ":", sizeof(JOURNAL_PARAMETER_ID ":") - 1) == 0) { - char *id = &keyword[sizeof(JOURNAL_PARAMETER_ID ":") - 1]; - - if(*id) - progress_id = id; - } else if(strncmp(keyword, JOURNAL_PARAMETER_SOURCE ":", sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1) == 0) { const char *value = &keyword[sizeof(JOURNAL_PARAMETER_SOURCE ":") - 1]; @@ -1930,15 +1855,7 @@ void function_systemd_journal(const char *transaction, char *function, int timeo // ------------------------------------------------------------------------ // put this request into the progress db - if(progress_id && *progress_id) { - fqs_item = dictionary_set_and_acquire_item(function_query_status_dict, progress_id, &tmp_fqs, sizeof(tmp_fqs)); - fqs = dictionary_acquired_item_value(fqs_item); - } - else { - // no progress id given, proceed without registering our progress in the dictionary - fqs = &tmp_fqs; - fqs_item = NULL; - } + fqs = &tmp_fqs; // ------------------------------------------------------------------------ // validate parameters @@ -1969,6 +1886,7 @@ void function_systemd_journal(const char *transaction, char *function, int timeo // ------------------------------------------------------------------------ // set query time-frame, anchors and direction + fqs->transaction = transaction; fqs->after_ut = after_s * USEC_PER_SEC; fqs->before_ut = (before_s * USEC_PER_SEC) + USEC_PER_SEC - 1; fqs->if_modified_since = if_modified_since; @@ -2045,11 +1963,9 @@ void function_systemd_journal(const char *transaction, char *function, int timeo buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_INFO, false); buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_SLICE, fqs->slice); buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DATA_ONLY, fqs->data_only); - buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_PROGRESS, false); buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_DELTA, fqs->delta); buffer_json_member_add_boolean(wb, JOURNAL_PARAMETER_TAIL, fqs->tail); buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_SAMPLING, fqs->sampling); - buffer_json_member_add_string(wb, JOURNAL_PARAMETER_ID, progress_id); buffer_json_member_add_uint64(wb, "source_type", fqs->source_type); buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_AFTER, fqs->after_ut / USEC_PER_SEC); buffer_json_member_add_uint64(wb, JOURNAL_PARAMETER_BEFORE, fqs->before_ut / USEC_PER_SEC); @@ -2098,11 +2014,6 @@ void function_systemd_journal(const char *transaction, char *function, int timeo goto output; } - if(progress) { - function_systemd_journal_progress(wb, transaction, progress_id); - goto cleanup; - } - response = netdata_systemd_journal_query(wb, facets, fqs); // ------------------------------------------------------------------------ @@ -2124,16 +2035,4 @@ cleanup: simple_pattern_free(sources); facets_destroy(facets); buffer_free(wb); - - if(fqs_item) { - dictionary_del(function_query_status_dict, dictionary_acquired_item_name(fqs_item)); - dictionary_acquired_item_release(function_query_status_dict, fqs_item); - dictionary_garbage_collect(function_query_status_dict); - } -} - -void journal_init_query_status(void) { - function_query_status_dict = dictionary_create_advanced( - DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, - NULL, sizeof(FUNCTION_QUERY_STATUS)); } |