summaryrefslogtreecommitdiffstats
path: root/libnetdata/functions_evloop
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:03 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-05-05 12:08:18 +0000
commit5da14042f70711ea5cf66e034699730335462f66 (patch)
tree0f6354ccac934ed87a2d555f45be4c831cf92f4a /libnetdata/functions_evloop
parentReleasing debian version 1.44.3-2. (diff)
downloadnetdata-5da14042f70711ea5cf66e034699730335462f66.tar.xz
netdata-5da14042f70711ea5cf66e034699730335462f66.zip
Merging upstream version 1.45.3+dfsg.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'libnetdata/functions_evloop')
-rw-r--r--libnetdata/functions_evloop/Makefile.am8
-rw-r--r--libnetdata/functions_evloop/README.md0
-rw-r--r--libnetdata/functions_evloop/functions_evloop.c223
-rw-r--r--libnetdata/functions_evloop/functions_evloop.h101
4 files changed, 0 insertions, 332 deletions
diff --git a/libnetdata/functions_evloop/Makefile.am b/libnetdata/functions_evloop/Makefile.am
deleted file mode 100644
index 161784b8f..000000000
--- a/libnetdata/functions_evloop/Makefile.am
+++ /dev/null
@@ -1,8 +0,0 @@
-# SPDX-License-Identifier: GPL-3.0-or-later
-
-AUTOMAKE_OPTIONS = subdir-objects
-MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
-
-dist_noinst_DATA = \
- README.md \
- $(NULL)
diff --git a/libnetdata/functions_evloop/README.md b/libnetdata/functions_evloop/README.md
deleted file mode 100644
index e69de29bb..000000000
--- a/libnetdata/functions_evloop/README.md
+++ /dev/null
diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c
deleted file mode 100644
index 044556ac6..000000000
--- a/libnetdata/functions_evloop/functions_evloop.c
+++ /dev/null
@@ -1,223 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#include "functions_evloop.h"
-
-#define MAX_FUNCTION_PARAMETERS 1024
-
-struct functions_evloop_worker_job {
- bool used;
- bool running;
- bool cancelled;
- char *cmd;
- const char *transaction;
- time_t timeout;
- functions_evloop_worker_execute_t cb;
-};
-
-struct rrd_functions_expectation {
- const char *function;
- size_t function_length;
- functions_evloop_worker_execute_t cb;
- time_t default_timeout;
- struct rrd_functions_expectation *prev, *next;
-};
-
-struct functions_evloop_globals {
- const char *tag;
-
- DICTIONARY *worker_queue;
- pthread_mutex_t worker_mutex;
- pthread_cond_t worker_cond_var;
- size_t workers;
-
- netdata_mutex_t *stdout_mutex;
- bool *plugin_should_exit;
-
- netdata_thread_t reader_thread;
- netdata_thread_t *worker_threads;
-
- struct rrd_functions_expectation *expectations;
-};
-
-static void *rrd_functions_worker_globals_worker_main(void *arg) {
- struct functions_evloop_globals *wg = arg;
-
- bool last_acquired = true;
- while (true) {
- pthread_mutex_lock(&wg->worker_mutex);
-
- if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
- pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
-
- const DICTIONARY_ITEM *acquired = NULL;
- struct functions_evloop_worker_job *j;
- dfe_start_write(wg->worker_queue, j) {
- if(j->running || j->cancelled)
- continue;
-
- acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
- j->running = true;
- break;
- }
- dfe_done(j);
-
- pthread_mutex_unlock(&wg->worker_mutex);
-
- if(acquired) {
- ND_LOG_STACK lgs[] = {
- ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
- ND_LOG_FIELD_END(),
- };
- ND_LOG_STACK_PUSH(lgs);
-
- last_acquired = true;
- j = dictionary_acquired_item_value(acquired);
- j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
- dictionary_del(wg->worker_queue, j->transaction);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- dictionary_garbage_collect(wg->worker_queue);
- }
- else
- last_acquired = false;
- }
- return NULL;
-}
-
-static void *rrd_functions_worker_globals_reader_main(void *arg) {
- struct functions_evloop_globals *wg = arg;
-
- char buffer[PLUGINSD_LINE_MAX + 1];
-
- char *s = NULL;
- while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
-
- char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
-
- const char *keyword = get_word(words, num_words, 0);
-
- if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
-
- if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
- netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
- keyword,
- transaction?transaction:"(unset)",
- timeout_s?timeout_s:"(unset)",
- function?function:"(unset)");
- }
- else {
- int timeout = str2i(timeout_s);
-
- bool found = false;
- struct rrd_functions_expectation *we;
- for(we = wg->expectations; we ;we = we->next) {
- if(strncmp(function, we->function, we->function_length) == 0) {
- struct functions_evloop_worker_job t = {
- .cmd = strdupz(function),
- .transaction = strdupz(transaction),
- .running = false,
- .cancelled = false,
- .timeout = timeout > 0 ? timeout : we->default_timeout,
- .used = false,
- .cb = we->cb,
- };
- struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
- if(j->used) {
- netdata_log_error("Received duplicate function transaction '%s'", transaction);
- freez((void *)t.cmd);
- freez((void *)t.transaction);
- }
- else {
- found = true;
- j->used = true;
- pthread_cond_signal(&wg->worker_cond_var);
- }
- }
- }
-
- if(!found) {
- netdata_mutex_lock(wg->stdout_mutex);
- pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
- "No function with this name found.");
- netdata_mutex_unlock(wg->stdout_mutex);
- }
- }
- }
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
- char *transaction = get_word(words, num_words, 1);
- const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
- if(acquired) {
- struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
- __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
- dictionary_acquired_item_release(wg->worker_queue, acquired);
- dictionary_del(wg->worker_queue, transaction);
- dictionary_garbage_collect(wg->worker_queue);
- }
- else
- netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
- }
- else
- netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
- }
-
- if(!s || feof(stdin) || ferror(stdin)) {
- *wg->plugin_should_exit = true;
- netdata_log_error("Received error on stdin.");
- }
-
- exit(1);
-}
-
-void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
- struct functions_evloop_worker_job *j = value;
- freez((void *)j->cmd);
- freez((void *)j->transaction);
-}
-
-struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
- struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
-
- wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
- dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
-
- pthread_mutex_init(&wg->worker_mutex, NULL);
- pthread_cond_init(&wg->worker_cond_var, NULL);
-
- wg->plugin_should_exit = plugin_should_exit;
- wg->stdout_mutex = stdout_mutex;
- wg->workers = worker_threads;
- wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
- wg->tag = tag;
-
- char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
- netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
- rrd_functions_worker_globals_reader_main, wg);
-
- for(size_t i = 0; i < wg->workers ; i++) {
- snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
- netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
- rrd_functions_worker_globals_worker_main, wg);
- }
-
- return wg;
-}
-
-void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
- struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
- we->function = function;
- we->function_length = strlen(we->function);
- we->cb = cb;
- we->default_timeout = default_timeout;
- DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
-}
-
-void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){
- for(size_t i = 0; i < wg->workers ; i++)
- netdata_thread_cancel(wg->worker_threads[i]);
-
- netdata_thread_cancel(wg->reader_thread);
-}
diff --git a/libnetdata/functions_evloop/functions_evloop.h b/libnetdata/functions_evloop/functions_evloop.h
deleted file mode 100644
index e5e83e95e..000000000
--- a/libnetdata/functions_evloop/functions_evloop.h
+++ /dev/null
@@ -1,101 +0,0 @@
-// SPDX-License-Identifier: GPL-3.0-or-later
-
-#ifndef NETDATA_FUNCTIONS_EVLOOP_H
-#define NETDATA_FUNCTIONS_EVLOOP_H
-
-#include "../libnetdata.h"
-
-#define PLUGINSD_KEYWORD_CHART "CHART"
-#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
-#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
-#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
-#define PLUGINSD_KEYWORD_SET "SET"
-#define PLUGINSD_KEYWORD_END "END"
-#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
-#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
-#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
-#define PLUGINSD_KEYWORD_LABEL "LABEL"
-#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
-#define PLUGINSD_KEYWORD_CLABEL "CLABEL"
-#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT"
-#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION"
-#define PLUGINSD_KEYWORD_FUNCTION_CANCEL "FUNCTION_CANCEL"
-#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN"
-#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END"
-
-#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
-#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
-#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
-#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
-#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
-#define PLUGINSD_KEYWORD_REPLAY_END "REND"
-
-#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
-#define PLUGINSD_KEYWORD_SET_V2 "SET2"
-#define PLUGINSD_KEYWORD_END_V2 "END2"
-
-#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
-#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
-#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
-#define PLUGINSD_KEYWORD_HOST "HOST"
-
-#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE"
-#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE"
-
-#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
-
-#define PLUGINSD_KEYWORD_EXIT "EXIT"
-
-#define PLUGINSD_KEYWORD_SLOT "SLOT" // to change the length of this, update pluginsd_extract_chart_slot() too
-
-#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
-
-typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, int timeout, bool *cancelled);
-struct functions_evloop_worker_job;
-struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit);
-void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout);
-void functions_evloop_cancel_threads(struct functions_evloop_globals *wg);
-
-
-#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
- buffer_sprintf(wb \
- , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
- , (transaction) ? (transaction) : "" \
- , (int)(code) \
- , (content_type) ? (content_type) : "" \
- , (long int)(expires) \
- )
-
-#define pluginsd_function_result_end_to_buffer(wb) \
- buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
-
-#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \
- fprintf(stdout \
- , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
- , (transaction) ? (transaction) : "" \
- , (int)(code) \
- , (content_type) ? (content_type) : "" \
- , (long int)(expires) \
- )
-
-#define pluginsd_function_result_end_to_stdout() \
- fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
-
-static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) {
- char buffer[PLUGINSD_LINE_MAX + 1];
- json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
-
- pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
- fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
- pluginsd_function_result_end_to_stdout();
- fflush(stdout);
-}
-
-static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
- pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
- fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
- pluginsd_function_result_end_to_stdout();
- fflush(stdout);
-}
-
-#endif //NETDATA_FUNCTIONS_EVLOOP_H