diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:03 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-05 12:08:18 +0000 |
commit | 5da14042f70711ea5cf66e034699730335462f66 (patch) | |
tree | 0f6354ccac934ed87a2d555f45be4c831cf92f4a /libnetdata/functions_evloop | |
parent | Releasing debian version 1.44.3-2. (diff) | |
download | netdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz netdata-5da14042f70711ea5cf66e034699730335462f66.zip |
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'libnetdata/functions_evloop')
-rw-r--r-- | libnetdata/functions_evloop/Makefile.am | 8 | ||||
-rw-r--r-- | libnetdata/functions_evloop/README.md | 0 | ||||
-rw-r--r-- | libnetdata/functions_evloop/functions_evloop.c | 223 | ||||
-rw-r--r-- | libnetdata/functions_evloop/functions_evloop.h | 101 |
4 files changed, 0 insertions, 332 deletions
diff --git a/libnetdata/functions_evloop/Makefile.am b/libnetdata/functions_evloop/Makefile.am deleted file mode 100644 index 161784b8f..000000000 --- a/libnetdata/functions_evloop/Makefile.am +++ /dev/null @@ -1,8 +0,0 @@ -# SPDX-License-Identifier: GPL-3.0-or-later - -AUTOMAKE_OPTIONS = subdir-objects -MAINTAINERCLEANFILES = $(srcdir)/Makefile.in - -dist_noinst_DATA = \ - README.md \ - $(NULL) diff --git a/libnetdata/functions_evloop/README.md b/libnetdata/functions_evloop/README.md deleted file mode 100644 index e69de29bb..000000000 --- a/libnetdata/functions_evloop/README.md +++ /dev/null diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c deleted file mode 100644 index 044556ac6..000000000 --- a/libnetdata/functions_evloop/functions_evloop.c +++ /dev/null @@ -1,223 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#include "functions_evloop.h" - -#define MAX_FUNCTION_PARAMETERS 1024 - -struct functions_evloop_worker_job { - bool used; - bool running; - bool cancelled; - char *cmd; - const char *transaction; - time_t timeout; - functions_evloop_worker_execute_t cb; -}; - -struct rrd_functions_expectation { - const char *function; - size_t function_length; - functions_evloop_worker_execute_t cb; - time_t default_timeout; - struct rrd_functions_expectation *prev, *next; -}; - -struct functions_evloop_globals { - const char *tag; - - DICTIONARY *worker_queue; - pthread_mutex_t worker_mutex; - pthread_cond_t worker_cond_var; - size_t workers; - - netdata_mutex_t *stdout_mutex; - bool *plugin_should_exit; - - netdata_thread_t reader_thread; - netdata_thread_t *worker_threads; - - struct rrd_functions_expectation *expectations; -}; - -static void *rrd_functions_worker_globals_worker_main(void *arg) { - struct functions_evloop_globals *wg = arg; - - bool last_acquired = true; - while (true) { - pthread_mutex_lock(&wg->worker_mutex); - - if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired) - pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex); - - const DICTIONARY_ITEM *acquired = NULL; - struct functions_evloop_worker_job *j; - dfe_start_write(wg->worker_queue, j) { - if(j->running || j->cancelled) - continue; - - acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); - j->running = true; - break; - } - dfe_done(j); - - pthread_mutex_unlock(&wg->worker_mutex); - - if(acquired) { - ND_LOG_STACK lgs[] = { - ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd), - ND_LOG_FIELD_END(), - }; - ND_LOG_STACK_PUSH(lgs); - - last_acquired = true; - j = dictionary_acquired_item_value(acquired); - j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled); - dictionary_del(wg->worker_queue, j->transaction); - dictionary_acquired_item_release(wg->worker_queue, acquired); - dictionary_garbage_collect(wg->worker_queue); - } - else - last_acquired = false; - } - return NULL; -} - -static void *rrd_functions_worker_globals_reader_main(void *arg) { - struct functions_evloop_globals *wg = arg; - - char buffer[PLUGINSD_LINE_MAX + 1]; - - char *s = NULL; - while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { - - char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS); - - const char *keyword = get_word(words, num_words, 0); - - if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); - - if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { - netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", - keyword, - transaction?transaction:"(unset)", - timeout_s?timeout_s:"(unset)", - function?function:"(unset)"); - } - else { - int timeout = str2i(timeout_s); - - bool found = false; - struct rrd_functions_expectation *we; - for(we = wg->expectations; we ;we = we->next) { - if(strncmp(function, we->function, we->function_length) == 0) { - struct functions_evloop_worker_job t = { - .cmd = strdupz(function), - .transaction = strdupz(transaction), - .running = false, - .cancelled = false, - .timeout = timeout > 0 ? timeout : we->default_timeout, - .used = false, - .cb = we->cb, - }; - struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t)); - if(j->used) { - netdata_log_error("Received duplicate function transaction '%s'", transaction); - freez((void *)t.cmd); - freez((void *)t.transaction); - } - else { - found = true; - j->used = true; - pthread_cond_signal(&wg->worker_cond_var); - } - } - } - - if(!found) { - netdata_mutex_lock(wg->stdout_mutex); - pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, - "No function with this name found."); - netdata_mutex_unlock(wg->stdout_mutex); - } - } - } - else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { - char *transaction = get_word(words, num_words, 1); - const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); - if(acquired) { - struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); - __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); - dictionary_acquired_item_release(wg->worker_queue, acquired); - dictionary_del(wg->worker_queue, transaction); - dictionary_garbage_collect(wg->worker_queue); - } - else - netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction); - } - else - netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); - } - - if(!s || feof(stdin) || ferror(stdin)) { - *wg->plugin_should_exit = true; - netdata_log_error("Received error on stdin."); - } - - exit(1); -} - -void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { - struct functions_evloop_worker_job *j = value; - freez((void *)j->cmd); - freez((void *)j->transaction); -} - -struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) { - struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals)); - - wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); - dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL); - - pthread_mutex_init(&wg->worker_mutex, NULL); - pthread_cond_init(&wg->worker_cond_var, NULL); - - wg->plugin_should_exit = plugin_should_exit; - wg->stdout_mutex = stdout_mutex; - wg->workers = worker_threads; - wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t )); - wg->tag = tag; - - char tag_buffer[NETDATA_THREAD_TAG_MAX + 1]; - snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag); - netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, - rrd_functions_worker_globals_reader_main, wg); - - for(size_t i = 0; i < wg->workers ; i++) { - snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1); - netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, - rrd_functions_worker_globals_worker_main, wg); - } - - return wg; -} - -void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) { - struct rrd_functions_expectation *we = callocz(1, sizeof(*we)); - we->function = function; - we->function_length = strlen(we->function); - we->cb = cb; - we->default_timeout = default_timeout; - DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next); -} - -void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){ - for(size_t i = 0; i < wg->workers ; i++) - netdata_thread_cancel(wg->worker_threads[i]); - - netdata_thread_cancel(wg->reader_thread); -} diff --git a/libnetdata/functions_evloop/functions_evloop.h b/libnetdata/functions_evloop/functions_evloop.h deleted file mode 100644 index e5e83e95e..000000000 --- a/libnetdata/functions_evloop/functions_evloop.h +++ /dev/null @@ -1,101 +0,0 @@ -// SPDX-License-Identifier: GPL-3.0-or-later - -#ifndef NETDATA_FUNCTIONS_EVLOOP_H -#define NETDATA_FUNCTIONS_EVLOOP_H - -#include "../libnetdata.h" - -#define PLUGINSD_KEYWORD_CHART "CHART" -#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" -#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" -#define PLUGINSD_KEYWORD_BEGIN "BEGIN" -#define PLUGINSD_KEYWORD_SET "SET" -#define PLUGINSD_KEYWORD_END "END" -#define PLUGINSD_KEYWORD_FLUSH "FLUSH" -#define PLUGINSD_KEYWORD_DISABLE "DISABLE" -#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" -#define PLUGINSD_KEYWORD_LABEL "LABEL" -#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" -#define PLUGINSD_KEYWORD_CLABEL "CLABEL" -#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT" -#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" -#define PLUGINSD_KEYWORD_FUNCTION_CANCEL "FUNCTION_CANCEL" -#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" -#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" - -#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" -#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" -#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" -#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" -#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" -#define PLUGINSD_KEYWORD_REPLAY_END "REND" - -#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" -#define PLUGINSD_KEYWORD_SET_V2 "SET2" -#define PLUGINSD_KEYWORD_END_V2 "END2" - -#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" -#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" -#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" -#define PLUGINSD_KEYWORD_HOST "HOST" - -#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE" -#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE" - -#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS" - -#define PLUGINSD_KEYWORD_EXIT "EXIT" - -#define PLUGINSD_KEYWORD_SLOT "SLOT" // to change the length of this, update pluginsd_extract_chart_slot() too - -#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds - -typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, int timeout, bool *cancelled); -struct functions_evloop_worker_job; -struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit); -void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout); -void functions_evloop_cancel_threads(struct functions_evloop_globals *wg); - - -#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \ - buffer_sprintf(wb \ - , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ - , (transaction) ? (transaction) : "" \ - , (int)(code) \ - , (content_type) ? (content_type) : "" \ - , (long int)(expires) \ - ) - -#define pluginsd_function_result_end_to_buffer(wb) \ - buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") - -#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \ - fprintf(stdout \ - , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ - , (transaction) ? (transaction) : "" \ - , (int)(code) \ - , (content_type) ? (content_type) : "" \ - , (long int)(expires) \ - ) - -#define pluginsd_function_result_end_to_stdout() \ - fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") - -static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) { - char buffer[PLUGINSD_LINE_MAX + 1]; - json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); - - pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); - fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); - pluginsd_function_result_end_to_stdout(); - fflush(stdout); -} - -static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) { - pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires); - fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout); - pluginsd_function_result_end_to_stdout(); - fflush(stdout); -} - -#endif //NETDATA_FUNCTIONS_EVLOOP_H |