diff options
Diffstat (limited to 'libnetdata/functions_evloop/functions_evloop.c')
-rw-r--r-- | libnetdata/functions_evloop/functions_evloop.c | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c new file mode 100644 index 00000000..044556ac --- /dev/null +++ b/libnetdata/functions_evloop/functions_evloop.c @@ -0,0 +1,223 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "functions_evloop.h" + +#define MAX_FUNCTION_PARAMETERS 1024 + +struct functions_evloop_worker_job { + bool used; + bool running; + bool cancelled; + char *cmd; + const char *transaction; + time_t timeout; + functions_evloop_worker_execute_t cb; +}; + +struct rrd_functions_expectation { + const char *function; + size_t function_length; + functions_evloop_worker_execute_t cb; + time_t default_timeout; + struct rrd_functions_expectation *prev, *next; +}; + +struct functions_evloop_globals { + const char *tag; + + DICTIONARY *worker_queue; + pthread_mutex_t worker_mutex; + pthread_cond_t worker_cond_var; + size_t workers; + + netdata_mutex_t *stdout_mutex; + bool *plugin_should_exit; + + netdata_thread_t reader_thread; + netdata_thread_t *worker_threads; + + struct rrd_functions_expectation *expectations; +}; + +static void *rrd_functions_worker_globals_worker_main(void *arg) { + struct functions_evloop_globals *wg = arg; + + bool last_acquired = true; + while (true) { + pthread_mutex_lock(&wg->worker_mutex); + + if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired) + pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex); + + const DICTIONARY_ITEM *acquired = NULL; + struct functions_evloop_worker_job *j; + dfe_start_write(wg->worker_queue, j) { + if(j->running || j->cancelled) + continue; + + acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); + j->running = true; + break; + } + dfe_done(j); + + pthread_mutex_unlock(&wg->worker_mutex); + + if(acquired) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + last_acquired = true; + j = dictionary_acquired_item_value(acquired); + j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled); + dictionary_del(wg->worker_queue, j->transaction); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_garbage_collect(wg->worker_queue); + } + else + last_acquired = false; + } + return NULL; +} + +static void *rrd_functions_worker_globals_reader_main(void *arg) { + struct functions_evloop_globals *wg = arg; + + char buffer[PLUGINSD_LINE_MAX + 1]; + + char *s = NULL; + while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) { + + char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS); + + const char *keyword = get_word(words, num_words, 0); + + if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + else { + int timeout = str2i(timeout_s); + + bool found = false; + struct rrd_functions_expectation *we; + for(we = wg->expectations; we ;we = we->next) { + if(strncmp(function, we->function, we->function_length) == 0) { + struct functions_evloop_worker_job t = { + .cmd = strdupz(function), + .transaction = strdupz(transaction), + .running = false, + .cancelled = false, + .timeout = timeout > 0 ? timeout : we->default_timeout, + .used = false, + .cb = we->cb, + }; + struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t)); + if(j->used) { + netdata_log_error("Received duplicate function transaction '%s'", transaction); + freez((void *)t.cmd); + freez((void *)t.transaction); + } + else { + found = true; + j->used = true; + pthread_cond_signal(&wg->worker_cond_var); + } + } + } + + if(!found) { + netdata_mutex_lock(wg->stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, + "No function with this name found."); + netdata_mutex_unlock(wg->stdout_mutex); + } + } + } + else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + char *transaction = get_word(words, num_words, 1); + const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); + if(acquired) { + struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); + __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_del(wg->worker_queue, transaction); + dictionary_garbage_collect(wg->worker_queue); + } + else + netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction); + } + else + netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)"); + } + + if(!s || feof(stdin) || ferror(stdin)) { + *wg->plugin_should_exit = true; + netdata_log_error("Received error on stdin."); + } + + exit(1); +} + +void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct functions_evloop_worker_job *j = value; + freez((void *)j->cmd); + freez((void *)j->transaction); +} + +struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) { + struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals)); + + wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL); + + pthread_mutex_init(&wg->worker_mutex, NULL); + pthread_cond_init(&wg->worker_cond_var, NULL); + + wg->plugin_should_exit = plugin_should_exit; + wg->stdout_mutex = stdout_mutex; + wg->workers = worker_threads; + wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t )); + wg->tag = tag; + + char tag_buffer[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag); + netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, + rrd_functions_worker_globals_reader_main, wg); + + for(size_t i = 0; i < wg->workers ; i++) { + snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1); + netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, + rrd_functions_worker_globals_worker_main, wg); + } + + return wg; +} + +void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) { + struct rrd_functions_expectation *we = callocz(1, sizeof(*we)); + we->function = function; + we->function_length = strlen(we->function); + we->cb = cb; + we->default_timeout = default_timeout; + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next); +} + +void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){ + for(size_t i = 0; i < wg->workers ; i++) + netdata_thread_cancel(wg->worker_threads[i]); + + netdata_thread_cancel(wg->reader_thread); +} |