summaryrefslogtreecommitdiffstats
path: root/libnetdata/functions_evloop/functions_evloop.c
diff options
context:
space:
mode:
Diffstat (limited to 'libnetdata/functions_evloop/functions_evloop.c')
-rw-r--r--libnetdata/functions_evloop/functions_evloop.c223
1 files changed, 0 insertions, 223 deletions
diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c
deleted file mode 100644
index 044556ac6..000000000
--- a/libnetdata/functions_evloop/functions_evloop.c
+++ /dev/null
@@ -1,223 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "functions_evloop.h"
-
-#define MAX_FUNCTION_PARAMETERS 1024
-
-struct functions_evloop_worker_job {
- bool used;
- bool running;
- bool cancelled;
- char *cmd;
- const char *transaction;
- time_t timeout;
- functions_evloop_worker_execute_t cb;
-};
-
-struct rrd_functions_expectation {
- const char *function;
- size_t function_length;
- functions_evloop_worker_execute_t cb;
- time_t default_timeout;
- struct rrd_functions_expectation *prev, *next;
-};
-
-struct functions_evloop_globals {
- const char *tag;
-
- DICTIONARY *worker_queue;
- pthread_mutex_t worker_mutex;
- pthread_cond_t worker_cond_var;
- size_t workers;
-
- netdata_mutex_t *stdout_mutex;
- bool *plugin_should_exit;
-
- netdata_thread_t reader_thread;
- netdata_thread_t *worker_threads;
-
- struct rrd_functions_expectation *expectations;
-};
-
-static void *rrd_functions_worker_globals_worker_main(void *arg) {
- struct functions_evloop_globals *wg = arg;
-
- bool last_acquired = true;
- while (true) {
- pthread_mutex_lock(&wg->worker_mutex);
-
- if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
- pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
-
- const DICTIONARY_ITEM *acquired = NULL;
- struct functions_evloop_worker_job *j;
- dfe_start_write(wg->worker_queue, j) {
- if(j->running || j->cancelled)
- continue;
-
- acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
- j->running = true;
- break;
- }
- dfe_done(j);
-
- pthread_mutex_unlock(&wg->worker_mutex);
-
- if(acquired) {
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
-
- last_acquired = true;
- j = dictionary_acquired_item_value(acquired);
- j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
- dictionary_del(wg->worker_queue, j->transaction);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- dictionary_garbage_collect(wg->worker_queue);
- }
- else
- last_acquired = false;
- }
- return NULL;
-}
-
-static void *rrd_functions_worker_globals_reader_main(void *arg) {
- struct functions_evloop_globals *wg = arg;
-
- char buffer[PLUGINSD_LINE_MAX + 1];
-
- char *s = NULL;
- while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
-
- char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
-
- const char *keyword = get_word(words, num_words, 0);
-
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- keyword,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
-
- bool found = false;
- struct rrd_functions_expectation *we;
- for(we = wg->expectations; we ;we = we->next) {
- if(strncmp(function, we->function, we->function_length) == 0) {
- struct functions_evloop_worker_job t = {
- .cmd = strdupz(function),
- .transaction = strdupz(transaction),
- .running = false,
- .cancelled = false,
- .timeout = timeout > 0 ? timeout : we->default_timeout,
- .used = false,
- .cb = we->cb,
- };
- struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
- if(j->used) {
- netdata_log_error("Received duplicate function transaction '%s'", transaction);
- freez((void *)t.cmd);
- freez((void *)t.transaction);
- }
- else {
- found = true;
- j->used = true;
- pthread_cond_signal(&wg->worker_cond_var);
- }
- }
- }
-
- if(!found) {
- netdata_mutex_lock(wg->stdout_mutex);
- pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
- "No function with this name found.");
- netdata_mutex_unlock(wg->stdout_mutex);
- }
- }
- }
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
- char *transaction = get_word(words, num_words, 1);
- const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
- if(acquired) {
- struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
- __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- dictionary_del(wg->worker_queue, transaction);
- dictionary_garbage_collect(wg->worker_queue);
- }
- else
- netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
- }
- else
- netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
- }
-
- if(!s || feof(stdin) || ferror(stdin)) {
- *wg->plugin_should_exit = true;
- netdata_log_error("Received error on stdin.");
- }
-
- exit(1);
-}
-
-void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
- struct functions_evloop_worker_job *j = value;
- freez((void *)j->cmd);
- freez((void *)j->transaction);
-}
-
-struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
- struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
-
- wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
- dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
-
- pthread_mutex_init(&wg->worker_mutex, NULL);
- pthread_cond_init(&wg->worker_cond_var, NULL);
-
- wg->plugin_should_exit = plugin_should_exit;
- wg->stdout_mutex = stdout_mutex;
- wg->workers = worker_threads;
- wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
- wg->tag = tag;
-
- char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
- netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
- rrd_functions_worker_globals_reader_main, wg);
-
- for(size_t i = 0; i < wg->workers ; i++) {
- snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
- netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
- rrd_functions_worker_globals_worker_main, wg);
- }
-
- return wg;
-}
-
-void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
- struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
- we->function = function;
- we->function_length = strlen(we->function);
- we->cb = cb;
- we->default_timeout = default_timeout;
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
-}
-
-void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){
- for(size_t i = 0; i < wg->workers ; i++)
- netdata_thread_cancel(wg->worker_threads[i]);
-
- netdata_thread_cancel(wg->reader_thread);
-}