summaryrefslogtreecommitdiffstats
path: root/libnetdata/functions_evloop
diff options
context:
space:
mode:
Diffstat (limited to 'libnetdata/functions_evloop')
-rw-r--r--libnetdata/functions_evloop/Makefile.am8
-rw-r--r--libnetdata/functions_evloop/README.md0
-rw-r--r--libnetdata/functions_evloop/functions_evloop.c223
-rw-r--r--libnetdata/functions_evloop/functions_evloop.h101
4 files changed, 332 insertions, 0 deletions
diff --git a/libnetdata/functions_evloop/Makefile.am b/libnetdata/functions_evloop/Makefile.am
new file mode 100644
index 00000000..161784b8
--- /dev/null
+++ b/libnetdata/functions_evloop/Makefile.am
@@ -0,0 +1,8 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
diff --git a/libnetdata/functions_evloop/README.md b/libnetdata/functions_evloop/README.md
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/libnetdata/functions_evloop/README.md
diff --git a/libnetdata/functions_evloop/functions_evloop.c b/libnetdata/functions_evloop/functions_evloop.c
new file mode 100644
index 00000000..044556ac
--- /dev/null
+++ b/libnetdata/functions_evloop/functions_evloop.c
@@ -0,0 +1,223 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "functions_evloop.h"
+
+#define MAX_FUNCTION_PARAMETERS 1024
+
+struct functions_evloop_worker_job {
+ bool used;
+ bool running;
+ bool cancelled;
+ char *cmd;
+ const char *transaction;
+ time_t timeout;
+ functions_evloop_worker_execute_t cb;
+};
+
+struct rrd_functions_expectation {
+ const char *function;
+ size_t function_length;
+ functions_evloop_worker_execute_t cb;
+ time_t default_timeout;
+ struct rrd_functions_expectation *prev, *next;
+};
+
+struct functions_evloop_globals {
+ const char *tag;
+
+ DICTIONARY *worker_queue;
+ pthread_mutex_t worker_mutex;
+ pthread_cond_t worker_cond_var;
+ size_t workers;
+
+ netdata_mutex_t *stdout_mutex;
+ bool *plugin_should_exit;
+
+ netdata_thread_t reader_thread;
+ netdata_thread_t *worker_threads;
+
+ struct rrd_functions_expectation *expectations;
+};
+
+static void *rrd_functions_worker_globals_worker_main(void *arg) {
+ struct functions_evloop_globals *wg = arg;
+
+ bool last_acquired = true;
+ while (true) {
+ pthread_mutex_lock(&wg->worker_mutex);
+
+ if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
+ pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
+
+ const DICTIONARY_ITEM *acquired = NULL;
+ struct functions_evloop_worker_job *j;
+ dfe_start_write(wg->worker_queue, j) {
+ if(j->running || j->cancelled)
+ continue;
+
+ acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
+ j->running = true;
+ break;
+ }
+ dfe_done(j);
+
+ pthread_mutex_unlock(&wg->worker_mutex);
+
+ if(acquired) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ last_acquired = true;
+ j = dictionary_acquired_item_value(acquired);
+ j->cb(j->transaction, j->cmd, j->timeout, &j->cancelled);
+ dictionary_del(wg->worker_queue, j->transaction);
+ dictionary_acquired_item_release(wg->worker_queue, acquired);
+ dictionary_garbage_collect(wg->worker_queue);
+ }
+ else
+ last_acquired = false;
+ }
+ return NULL;
+}
+
+static void *rrd_functions_worker_globals_reader_main(void *arg) {
+ struct functions_evloop_globals *wg = arg;
+
+ char buffer[PLUGINSD_LINE_MAX + 1];
+
+ char *s = NULL;
+ while(!(*wg->plugin_should_exit) && (s = fgets(buffer, PLUGINSD_LINE_MAX, stdin))) {
+
+ char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
+ size_t num_words = quoted_strings_splitter_pluginsd(buffer, words, MAX_FUNCTION_PARAMETERS);
+
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ netdata_log_error("Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+
+ bool found = false;
+ struct rrd_functions_expectation *we;
+ for(we = wg->expectations; we ;we = we->next) {
+ if(strncmp(function, we->function, we->function_length) == 0) {
+ struct functions_evloop_worker_job t = {
+ .cmd = strdupz(function),
+ .transaction = strdupz(transaction),
+ .running = false,
+ .cancelled = false,
+ .timeout = timeout > 0 ? timeout : we->default_timeout,
+ .used = false,
+ .cb = we->cb,
+ };
+ struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
+ if(j->used) {
+ netdata_log_error("Received duplicate function transaction '%s'", transaction);
+ freez((void *)t.cmd);
+ freez((void *)t.transaction);
+ }
+ else {
+ found = true;
+ j->used = true;
+ pthread_cond_signal(&wg->worker_cond_var);
+ }
+ }
+ }
+
+ if(!found) {
+ netdata_mutex_lock(wg->stdout_mutex);
+ pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND,
+ "No function with this name found.");
+ netdata_mutex_unlock(wg->stdout_mutex);
+ }
+ }
+ }
+ else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
+ if(acquired) {
+ struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
+ __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
+ dictionary_acquired_item_release(wg->worker_queue, acquired);
+ dictionary_del(wg->worker_queue, transaction);
+ dictionary_garbage_collect(wg->worker_queue);
+ }
+ else
+ netdata_log_error("Received CANCEL for transaction '%s', but it not available here", transaction);
+ }
+ else
+ netdata_log_error("Received unknown command: %s", keyword?keyword:"(unset)");
+ }
+
+ if(!s || feof(stdin) || ferror(stdin)) {
+ *wg->plugin_should_exit = true;
+ netdata_log_error("Received error on stdin.");
+ }
+
+ exit(1);
+}
+
+void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
+ struct functions_evloop_worker_job *j = value;
+ freez((void *)j->cmd);
+ freez((void *)j->transaction);
+}
+
+struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
+ struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
+
+ wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
+
+ pthread_mutex_init(&wg->worker_mutex, NULL);
+ pthread_cond_init(&wg->worker_cond_var, NULL);
+
+ wg->plugin_should_exit = plugin_should_exit;
+ wg->stdout_mutex = stdout_mutex;
+ wg->workers = worker_threads;
+ wg->worker_threads = callocz(wg->workers, sizeof(netdata_thread_t ));
+ wg->tag = tag;
+
+ char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
+ netdata_thread_create(&wg->reader_thread, tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
+ rrd_functions_worker_globals_reader_main, wg);
+
+ for(size_t i = 0; i < wg->workers ; i++) {
+ snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
+ netdata_thread_create(&wg->worker_threads[i], tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
+ rrd_functions_worker_globals_worker_main, wg);
+ }
+
+ return wg;
+}
+
+void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout) {
+ struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
+ we->function = function;
+ we->function_length = strlen(we->function);
+ we->cb = cb;
+ we->default_timeout = default_timeout;
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
+}
+
+void functions_evloop_cancel_threads(struct functions_evloop_globals *wg){
+ for(size_t i = 0; i < wg->workers ; i++)
+ netdata_thread_cancel(wg->worker_threads[i]);
+
+ netdata_thread_cancel(wg->reader_thread);
+}
diff --git a/libnetdata/functions_evloop/functions_evloop.h b/libnetdata/functions_evloop/functions_evloop.h
new file mode 100644
index 00000000..e5e83e95
--- /dev/null
+++ b/libnetdata/functions_evloop/functions_evloop.h
@@ -0,0 +1,101 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_FUNCTIONS_EVLOOP_H
+#define NETDATA_FUNCTIONS_EVLOOP_H
+
+#include "../libnetdata.h"
+
+#define PLUGINSD_KEYWORD_CHART "CHART"
+#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
+#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
+#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
+#define PLUGINSD_KEYWORD_SET "SET"
+#define PLUGINSD_KEYWORD_END "END"
+#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
+#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
+#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
+#define PLUGINSD_KEYWORD_LABEL "LABEL"
+#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
+#define PLUGINSD_KEYWORD_CLABEL "CLABEL"
+#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT"
+#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION"
+#define PLUGINSD_KEYWORD_FUNCTION_CANCEL "FUNCTION_CANCEL"
+#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN"
+#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END"
+
+#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
+#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
+#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
+#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_END "REND"
+
+#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
+#define PLUGINSD_KEYWORD_SET_V2 "SET2"
+#define PLUGINSD_KEYWORD_END_V2 "END2"
+
+#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
+#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
+#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
+#define PLUGINSD_KEYWORD_HOST "HOST"
+
+#define PLUGINSD_KEYWORD_DYNCFG_ENABLE "DYNCFG_ENABLE"
+#define PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE "DYNCFG_REGISTER_MODULE"
+
+#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
+
+#define PLUGINSD_KEYWORD_EXIT "EXIT"
+
+#define PLUGINSD_KEYWORD_SLOT "SLOT" // to change the length of this, update pluginsd_extract_chart_slot() too
+
+#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
+
+typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, int timeout, bool *cancelled);
+struct functions_evloop_worker_job;
+struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit);
+void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout);
+void functions_evloop_cancel_threads(struct functions_evloop_globals *wg);
+
+
+#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
+ buffer_sprintf(wb \
+ , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
+ , (transaction) ? (transaction) : "" \
+ , (int)(code) \
+ , (content_type) ? (content_type) : "" \
+ , (long int)(expires) \
+ )
+
+#define pluginsd_function_result_end_to_buffer(wb) \
+ buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+
+#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \
+ fprintf(stdout \
+ , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
+ , (transaction) ? (transaction) : "" \
+ , (int)(code) \
+ , (content_type) ? (content_type) : "" \
+ , (long int)(expires) \
+ )
+
+#define pluginsd_function_result_end_to_stdout() \
+ fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+
+static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) {
+ char buffer[PLUGINSD_LINE_MAX + 1];
+ json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
+
+ pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
+ fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
+ pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
+}
+
+static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
+ pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
+ fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
+ pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
+}
+
+#endif //NETDATA_FUNCTIONS_EVLOOP_H