summaryrefslogtreecommitdiffstats
path: root/src/libnetdata/functions_evloop
diff options
context:
space:
mode:
Diffstat (limited to 'src/libnetdata/functions_evloop')
-rw-r--r--src/libnetdata/functions_evloop/README.md0
-rw-r--r--src/libnetdata/functions_evloop/functions_evloop.c466
-rw-r--r--src/libnetdata/functions_evloop/functions_evloop.h156
3 files changed, 622 insertions, 0 deletions
diff --git a/src/libnetdata/functions_evloop/README.md b/src/libnetdata/functions_evloop/README.md
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/src/libnetdata/functions_evloop/README.md
diff --git a/src/libnetdata/functions_evloop/functions_evloop.c b/src/libnetdata/functions_evloop/functions_evloop.c
new file mode 100644
index 000000000..5000d038f
--- /dev/null
+++ b/src/libnetdata/functions_evloop/functions_evloop.c
@@ -0,0 +1,466 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "functions_evloop.h"
+
+static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut,
+ bool *cancelled, BUFFER *payload, HTTP_ACCESS access,
+ const char *source, void *data);
+
+struct functions_evloop_worker_job {
+ bool used;
+ bool running;
+ bool cancelled;
+ usec_t stop_monotonic_ut;
+ char *cmd;
+ const char *transaction;
+ time_t timeout;
+
+ BUFFER *payload;
+ HTTP_ACCESS access;
+ const char *source;
+
+ functions_evloop_worker_execute_t cb;
+ void *cb_data;
+};
+
+static void worker_job_cleanup(struct functions_evloop_worker_job *j) {
+ freez((void *)j->cmd);
+ freez((void *)j->transaction);
+ freez((void *)j->source);
+ buffer_free(j->payload);
+}
+
+struct rrd_functions_expectation {
+ const char *function;
+ size_t function_length;
+ functions_evloop_worker_execute_t cb;
+ void *cb_data;
+ time_t default_timeout;
+ struct rrd_functions_expectation *prev, *next;
+};
+
+struct functions_evloop_globals {
+ const char *tag;
+
+ DICTIONARY *worker_queue;
+ pthread_mutex_t worker_mutex;
+ pthread_cond_t worker_cond_var;
+ size_t workers;
+
+ netdata_mutex_t *stdout_mutex;
+ bool *plugin_should_exit;
+ bool workers_exit; // all workers are waiting on the same condition - this makes them all exit, when any is cancelled
+
+ ND_THREAD *reader_thread;
+ ND_THREAD **worker_threads;
+
+ struct {
+ DICTIONARY *nodes;
+ } dyncfg;
+
+ struct rrd_functions_expectation *expectations;
+};
+
+static void rrd_functions_worker_canceller(void *data) {
+ struct functions_evloop_globals *wg = data;
+ pthread_mutex_lock(&wg->worker_mutex);
+ wg->workers_exit = true;
+ pthread_cond_signal(&wg->worker_cond_var);
+ pthread_mutex_unlock(&wg->worker_mutex);
+}
+
+static void *rrd_functions_worker_globals_worker_main(void *arg) {
+ struct functions_evloop_globals *wg = arg;
+
+ nd_thread_register_canceller(rrd_functions_worker_canceller, wg);
+
+ bool last_acquired = true;
+ while (true) {
+ pthread_mutex_lock(&wg->worker_mutex);
+
+ if(wg->workers_exit || nd_thread_signaled_to_cancel()) {
+ pthread_mutex_unlock(&wg->worker_mutex);
+ break;
+ }
+
+ if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired)
+ pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex);
+
+ const DICTIONARY_ITEM *acquired = NULL;
+ struct functions_evloop_worker_job *j;
+ dfe_start_write(wg->worker_queue, j) {
+ if(j->running || j->cancelled)
+ continue;
+
+ acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item);
+ j->running = true;
+ break;
+ }
+ dfe_done(j);
+
+ pthread_mutex_unlock(&wg->worker_mutex);
+
+ if(wg->workers_exit || nd_thread_signaled_to_cancel()) {
+ if(acquired)
+ dictionary_acquired_item_release(wg->worker_queue, acquired);
+
+ break;
+ }
+
+ if(acquired) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ last_acquired = true;
+ j = dictionary_acquired_item_value(acquired);
+ j->cb(j->transaction, j->cmd, &j->stop_monotonic_ut, &j->cancelled, j->payload, j->access, j->source, j->cb_data);
+ dictionary_del(wg->worker_queue, j->transaction);
+ dictionary_acquired_item_release(wg->worker_queue, acquired);
+ dictionary_garbage_collect(wg->worker_queue);
+ }
+ else
+ last_acquired = false;
+ }
+
+ return NULL;
+}
+
+static void worker_add_job(struct functions_evloop_globals *wg, const char *keyword, char *transaction, char *function, char *timeout_s, BUFFER *payload, const char *access, const char *source) {
+ if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
+ nd_log(NDLS_COLLECTORS, NDLP_ERR, "Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
+ keyword,
+ transaction?transaction:"(unset)",
+ timeout_s?timeout_s:"(unset)",
+ function?function:"(unset)");
+ }
+ else {
+ int timeout = str2i(timeout_s);
+
+ const char *msg = "No function with this name found";
+ bool found = false;
+ struct rrd_functions_expectation *we;
+ for(we = wg->expectations; we ;we = we->next) {
+ if(strncmp(function, we->function, we->function_length) == 0) {
+ if(timeout <= 0)
+ timeout = (int)we->default_timeout;
+
+ struct functions_evloop_worker_job t = {
+ .cmd = strdupz(function),
+ .transaction = strdupz(transaction),
+ .running = false,
+ .cancelled = false,
+ .timeout = timeout,
+ .stop_monotonic_ut = now_monotonic_usec() + (timeout * USEC_PER_SEC),
+ .used = false,
+ .payload = buffer_dup(payload),
+ .access = http_access_from_hex(access),
+ .source = source ? strdupz(source) : NULL,
+ .cb = we->cb,
+ .cb_data = we->cb_data,
+ };
+ struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t));
+ if(j->used) {
+ nd_log(NDLS_COLLECTORS, NDLP_WARNING, "Received duplicate function transaction '%s'. Ignoring it.", transaction);
+ worker_job_cleanup(&t);
+ msg = "Duplicate function transaction. Ignoring it.";
+ }
+ else {
+ found = true;
+ j->used = true;
+ pthread_mutex_lock(&wg->worker_mutex);
+ pthread_cond_signal(&wg->worker_cond_var);
+ pthread_mutex_unlock(&wg->worker_mutex);
+ }
+ }
+ }
+
+ if(!found) {
+ netdata_mutex_lock(wg->stdout_mutex);
+ pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, msg);
+ netdata_mutex_unlock(wg->stdout_mutex);
+ }
+ }
+}
+
+static void *rrd_functions_worker_globals_reader_main(void *arg) {
+ struct functions_evloop_globals *wg = arg;
+
+ struct {
+ size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc...
+ bool enabled;
+ char *transaction;
+ char *function;
+ char *timeout_s;
+ char *access;
+ char *source;
+ char *content_type;
+ } deferred = { 0 };
+
+ struct buffered_reader reader = { 0 };
+ buffered_reader_init(&reader);
+ BUFFER *buffer = buffer_create(sizeof(reader.read_buffer) + 2, NULL);
+
+ while(!(*wg->plugin_should_exit)) {
+ if(unlikely(!buffered_reader_next_line(&reader, buffer))) {
+ buffered_reader_ret_t ret = buffered_reader_read_timeout(
+ &reader,
+ fileno((FILE *)stdin),
+ 2 * 60 * MSEC_PER_SEC,
+ false
+ );
+
+ if(unlikely(ret != BUFFERED_READER_READ_OK && ret != BUFFERED_READER_READ_POLL_TIMEOUT))
+ break;
+
+ continue;
+ }
+
+ if(deferred.enabled) {
+ char *s = (char *)buffer_tostring(buffer);
+
+ if(strstr(&s[deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) {
+ if(deferred.last_len > 0)
+ // remove the trailing newline from the buffer
+ deferred.last_len--;
+
+ s[deferred.last_len] = '\0';
+ buffer->len = deferred.last_len;
+ buffer->content_type = content_type_string2id(deferred.content_type);
+ worker_add_job(wg,
+ PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN, deferred.transaction, deferred.function,
+ deferred.timeout_s, buffer, deferred.access, deferred.source);
+ buffer_flush(buffer);
+
+ freez(deferred.transaction);
+ freez(deferred.function);
+ freez(deferred.timeout_s);
+ freez(deferred.access);
+ freez(deferred.source);
+ freez(deferred.content_type);
+ memset(&deferred, 0, sizeof(deferred));
+ }
+ else
+ deferred.last_len = buffer->len;
+
+ continue;
+ }
+
+ char *words[MAX_FUNCTION_PARAMETERS] = { NULL };
+ size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS);
+
+ const char *keyword = get_word(words, num_words, 0);
+
+ if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) {
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+ char *access = get_word(words, num_words, 4);
+ char *source = get_word(words, num_words, 5);
+ worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source);
+ }
+ else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) {
+ char *transaction = get_word(words, num_words, 1);
+ char *timeout_s = get_word(words, num_words, 2);
+ char *function = get_word(words, num_words, 3);
+ char *access = get_word(words, num_words, 4);
+ char *source = get_word(words, num_words, 5);
+ char *content_type = get_word(words, num_words, 6);
+
+ deferred.transaction = strdupz(transaction ? transaction : "");
+ deferred.timeout_s = strdupz(timeout_s ? timeout_s : "");
+ deferred.function = strdupz(function ? function : "");
+ deferred.access = strdupz(access ? access : "");
+ deferred.source = strdupz(source ? source : "");
+ deferred.content_type = strdupz(content_type ? content_type : "");
+ deferred.last_len = 0;
+ deferred.enabled = true;
+ }
+ else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
+ if(acquired) {
+ struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
+ __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED);
+ dictionary_acquired_item_release(wg->worker_queue, acquired);
+ dictionary_del(wg->worker_queue, transaction);
+ dictionary_garbage_collect(wg->worker_queue);
+ }
+ else
+ nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction);
+ }
+ else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) {
+ char *transaction = get_word(words, num_words, 1);
+ const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction);
+ if(acquired) {
+ struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired);
+
+ functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut);
+
+ dictionary_acquired_item_release(wg->worker_queue, acquired);
+ }
+ else
+ nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction);
+ }
+ else
+ nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)");
+
+ buffer_flush(buffer);
+ }
+
+ if(!(*wg->plugin_should_exit))
+ nd_log(NDLS_COLLECTORS, NDLP_ERR, "Read error on stdin");
+
+ *wg->plugin_should_exit = true;
+ exit(1);
+}
+
+void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
+ struct functions_evloop_worker_job *j = value;
+ worker_job_cleanup(j);
+}
+
+struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) {
+ struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals));
+
+ wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
+ dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL);
+
+ wg->dyncfg.nodes = dyncfg_nodes_dictionary_create();
+
+ pthread_mutex_init(&wg->worker_mutex, NULL);
+ pthread_cond_init(&wg->worker_cond_var, NULL);
+
+ wg->plugin_should_exit = plugin_should_exit;
+ wg->stdout_mutex = stdout_mutex;
+ wg->workers = worker_threads;
+ wg->worker_threads = callocz(wg->workers, sizeof(ND_THREAD *));
+ wg->tag = tag;
+
+ char tag_buffer[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag);
+ wg->reader_thread = nd_thread_create(tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
+ rrd_functions_worker_globals_reader_main, wg);
+
+ for(size_t i = 0; i < wg->workers ; i++) {
+ snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1);
+ wg->worker_threads[i] = nd_thread_create(tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG,
+ rrd_functions_worker_globals_worker_main, wg);
+ }
+
+ functions_evloop_add_function(wg, "config", functions_evloop_config_cb, 120, wg);
+
+ return wg;
+}
+
+void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data) {
+ struct rrd_functions_expectation *we = callocz(1, sizeof(*we));
+ we->function = function;
+ we->function_length = strlen(we->function);
+ we->cb = cb;
+ we->cb_data = data;
+ we->default_timeout = default_timeout;
+ DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next);
+}
+
+void functions_evloop_cancel_threads(struct functions_evloop_globals *wg) {
+ nd_thread_signal_cancel(wg->reader_thread);
+
+ for(size_t i = 0; i < wg->workers ; i++)
+ nd_thread_signal_cancel(wg->worker_threads[i]);
+}
+
+// ----------------------------------------------------------------------------
+
+static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled,
+ BUFFER *payload, HTTP_ACCESS access, const char *source, void *data) {
+ struct functions_evloop_globals *wg = data;
+
+ CLEAN_BUFFER *result = buffer_create(1024, NULL);
+ int code = dyncfg_node_find_and_call(wg->dyncfg.nodes, transaction, function, stop_monotonic_ut,
+ cancelled, payload, access, source, result);
+
+ netdata_mutex_lock(wg->stdout_mutex);
+ pluginsd_function_result_begin_to_stdout(transaction, code, content_type_id2string(result->content_type), result->expires);
+ printf("%s", buffer_tostring(result));
+ pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
+ netdata_mutex_unlock(wg->stdout_mutex);
+}
+
+void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path,
+ DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type,
+ const char *source, DYNCFG_CMDS cmds,
+ HTTP_ACCESS view_access, HTTP_ACCESS edit_access,
+ dyncfg_cb_t cb, void *data) {
+
+ if(!dyncfg_is_valid_id(id)) {
+ nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
+ return;
+ }
+
+ struct dyncfg_node tmp = {
+ .cmds = cmds,
+ .type = type,
+ .cb = cb,
+ .data = data,
+ };
+ dictionary_set(wg->dyncfg.nodes, id, &tmp, sizeof(tmp));
+
+ CLEAN_BUFFER *c = buffer_create(100, NULL);
+ dyncfg_cmds2buffer(cmds, c);
+
+ netdata_mutex_lock(wg->stdout_mutex);
+
+ fprintf(stdout,
+ PLUGINSD_KEYWORD_CONFIG " '%s' " PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE " '%s' '%s' '%s' '%s' '%s' '%s' "HTTP_ACCESS_FORMAT" "HTTP_ACCESS_FORMAT"\n",
+ id,
+ dyncfg_id2status(status),
+ dyncfg_id2type(type), path,
+ dyncfg_id2source_type(source_type),
+ source,
+ buffer_tostring(c),
+ (HTTP_ACCESS_FORMAT_CAST)view_access,
+ (HTTP_ACCESS_FORMAT_CAST)edit_access
+ );
+ fflush(stdout);
+
+ netdata_mutex_unlock(wg->stdout_mutex);
+}
+
+void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id) {
+ if(!dyncfg_is_valid_id(id)) {
+ nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
+ return;
+ }
+
+ dictionary_del(wg->dyncfg.nodes, id);
+
+ netdata_mutex_lock(wg->stdout_mutex);
+
+ fprintf(stdout,
+ PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "\n",
+ id);
+ fflush(stdout);
+
+ netdata_mutex_unlock(wg->stdout_mutex);
+}
+
+void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status) {
+ if(!dyncfg_is_valid_id(id)) {
+ nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id);
+ return;
+ }
+
+ netdata_mutex_lock(wg->stdout_mutex);
+
+ fprintf(stdout,
+ PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS " %s\n",
+ id, dyncfg_id2status(status));
+
+ fflush(stdout);
+
+ netdata_mutex_unlock(wg->stdout_mutex);
+}
diff --git a/src/libnetdata/functions_evloop/functions_evloop.h b/src/libnetdata/functions_evloop/functions_evloop.h
new file mode 100644
index 000000000..5c575bd17
--- /dev/null
+++ b/src/libnetdata/functions_evloop/functions_evloop.h
@@ -0,0 +1,156 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_FUNCTIONS_EVLOOP_H
+#define NETDATA_FUNCTIONS_EVLOOP_H
+
+#include "../libnetdata.h"
+
+#define MAX_FUNCTION_PARAMETERS 1024
+#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds
+
+// plugins.d 1st version of the external plugins and streaming protocol
+#define PLUGINSD_KEYWORD_CHART "CHART"
+#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END"
+#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION"
+#define PLUGINSD_KEYWORD_BEGIN "BEGIN"
+#define PLUGINSD_KEYWORD_SET "SET"
+#define PLUGINSD_KEYWORD_END "END"
+#define PLUGINSD_KEYWORD_FLUSH "FLUSH"
+#define PLUGINSD_KEYWORD_DISABLE "DISABLE"
+#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE"
+#define PLUGINSD_KEYWORD_LABEL "LABEL"
+#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE"
+#define PLUGINSD_KEYWORD_CLABEL "CLABEL"
+#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT"
+#define PLUGINSD_KEYWORD_EXIT "EXIT"
+
+// high-speed versions of BEGIN, SET, END
+#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2"
+#define PLUGINSD_KEYWORD_SET_V2 "SET2"
+#define PLUGINSD_KEYWORD_END_V2 "END2"
+
+// super high-speed versions of BEGIN, SET, END have this as first parameter
+// enabled with the streaming capability STREAM_CAP_SLOTS
+#define PLUGINSD_KEYWORD_SLOT "SLOT" // to change the length of this, update pluginsd_extract_chart_slot() too
+
+// virtual hosts (only for external plugins - for streaming virtual hosts are like all other hosts)
+#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE"
+#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END"
+#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL"
+#define PLUGINSD_KEYWORD_HOST "HOST"
+
+// replication
+// enabled with STREAM_CAP_REPLICATION
+#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART"
+#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN"
+#define PLUGINSD_KEYWORD_REPLAY_SET "RSET"
+#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE"
+#define PLUGINSD_KEYWORD_REPLAY_END "REND"
+
+// plugins.d accepts these for functions (from external plugins or streaming children)
+// related to STREAM_CAP_FUNCTIONS, STREAM_CAP_PROGRESS
+#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" // define a function
+#define PLUGINSD_KEYWORD_FUNCTION_PROGRESS "FUNCTION_PROGRESS" // send updates about function progress
+#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" // the result of a function transaction
+#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" // the end of the result of a func. trans.
+
+// plugins.d sends these for functions (to external plugins or streaming children)
+// related to STREAM_CAP_FUNCTIONS, STREAM_CAP_PROGRESS
+#define PLUGINSD_CALL_FUNCTION "FUNCTION" // call a function to a plugin or remote host
+#define PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN "FUNCTION_PAYLOAD" // call a function with a payload
+#define PLUGINSD_CALL_FUNCTION_PAYLOAD_END "FUNCTION_PAYLOAD_END" // function payload ends
+#define PLUGINSD_CALL_FUNCTION_CANCEL "FUNCTION_CANCEL" // cancel a running function transaction
+#define PLUGINSD_CALL_FUNCTION_PROGRESS "FUNCTION_PROGRESS" // let the function know the user is waiting
+
+// dyncfg
+// enabled with STREAM_CAP_DYNCFG
+#define PLUGINSD_KEYWORD_CONFIG "CONFIG"
+#define PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE "create"
+#define PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "delete"
+#define PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS "status"
+#define PLUGINSD_FUNCTION_CONFIG "config"
+
+typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, usec_t *stop_monotonic_ut,
+ bool *cancelled, BUFFER *payload, HTTP_ACCESS access,
+ const char *source, void *data);
+
+struct functions_evloop_worker_job;
+struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit);
+void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data);
+void functions_evloop_cancel_threads(struct functions_evloop_globals *wg);
+
+#define FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT (10 * USEC_PER_SEC)
+static inline void functions_stop_monotonic_update_on_progress(usec_t *stop_monotonic_ut) {
+ usec_t now_ut = now_monotonic_usec();
+ if(now_ut + FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT > *stop_monotonic_ut) {
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Extending function timeout due to PROGRESS update...");
+ __atomic_store_n(stop_monotonic_ut, now_ut + FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT, __ATOMIC_RELAXED);
+ }
+ else
+ nd_log(NDLS_DAEMON, NDLP_DEBUG, "Received PROGRESS update...");
+}
+
+#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \
+ buffer_sprintf(wb \
+ , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
+ , (transaction) ? (transaction) : "" \
+ , (int)(code) \
+ , (content_type) ? (content_type) : "" \
+ , (long int)(expires) \
+ )
+
+#define pluginsd_function_result_end_to_buffer(wb) \
+ buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+
+#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \
+ fprintf(stdout \
+ , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \
+ , (transaction) ? (transaction) : "" \
+ , (int)(code) \
+ , (content_type) ? (content_type) : "" \
+ , (long int)(expires) \
+ )
+
+#define pluginsd_function_result_end_to_stdout() \
+ fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n")
+
+static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) {
+ char buffer[PLUGINSD_LINE_MAX + 1];
+ json_escape_string(buffer, msg, PLUGINSD_LINE_MAX);
+
+ pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec());
+ fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer);
+ pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
+}
+
+static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) {
+ pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires);
+ fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout);
+ pluginsd_function_result_end_to_stdout();
+ fflush(stdout);
+}
+
+static inline void pluginsd_function_progress_to_stdout(const char *transaction, size_t done, size_t all) {
+ fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION_PROGRESS " '%s' %zu %zu\n",
+ transaction, done, all);
+ fflush(stdout);
+}
+
+static inline void send_newline_and_flush(pthread_mutex_t *mutex) {
+ netdata_mutex_lock(mutex);
+ fprintf(stdout, "\n");
+ fflush(stdout);
+ netdata_mutex_unlock(mutex);
+}
+
+void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path,
+ DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, const char *source, DYNCFG_CMDS cmds,
+ HTTP_ACCESS view_access, HTTP_ACCESS edit_access,
+ dyncfg_cb_t cb, void *data);
+
+void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id);
+void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status);
+
+#endif //NETDATA_FUNCTIONS_EVLOOP_H