diff options
Diffstat (limited to 'src/libnetdata/functions_evloop')
-rw-r--r-- | src/libnetdata/functions_evloop/README.md | 0 | ||||
-rw-r--r-- | src/libnetdata/functions_evloop/functions_evloop.c | 466 | ||||
-rw-r--r-- | src/libnetdata/functions_evloop/functions_evloop.h | 156 |
3 files changed, 622 insertions, 0 deletions
diff --git a/src/libnetdata/functions_evloop/README.md b/src/libnetdata/functions_evloop/README.md new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/src/libnetdata/functions_evloop/README.md diff --git a/src/libnetdata/functions_evloop/functions_evloop.c b/src/libnetdata/functions_evloop/functions_evloop.c new file mode 100644 index 000000000..5000d038f --- /dev/null +++ b/src/libnetdata/functions_evloop/functions_evloop.c @@ -0,0 +1,466 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "functions_evloop.h" + +static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, + bool *cancelled, BUFFER *payload, HTTP_ACCESS access, + const char *source, void *data); + +struct functions_evloop_worker_job { + bool used; + bool running; + bool cancelled; + usec_t stop_monotonic_ut; + char *cmd; + const char *transaction; + time_t timeout; + + BUFFER *payload; + HTTP_ACCESS access; + const char *source; + + functions_evloop_worker_execute_t cb; + void *cb_data; +}; + +static void worker_job_cleanup(struct functions_evloop_worker_job *j) { + freez((void *)j->cmd); + freez((void *)j->transaction); + freez((void *)j->source); + buffer_free(j->payload); +} + +struct rrd_functions_expectation { + const char *function; + size_t function_length; + functions_evloop_worker_execute_t cb; + void *cb_data; + time_t default_timeout; + struct rrd_functions_expectation *prev, *next; +}; + +struct functions_evloop_globals { + const char *tag; + + DICTIONARY *worker_queue; + pthread_mutex_t worker_mutex; + pthread_cond_t worker_cond_var; + size_t workers; + + netdata_mutex_t *stdout_mutex; + bool *plugin_should_exit; + bool workers_exit; // all workers are waiting on the same condition - this makes them all exit, when any is cancelled + + ND_THREAD *reader_thread; + ND_THREAD **worker_threads; + + struct { + DICTIONARY *nodes; + } dyncfg; + + struct rrd_functions_expectation *expectations; +}; + +static void rrd_functions_worker_canceller(void *data) { + struct functions_evloop_globals *wg = data; + pthread_mutex_lock(&wg->worker_mutex); + wg->workers_exit = true; + pthread_cond_signal(&wg->worker_cond_var); + pthread_mutex_unlock(&wg->worker_mutex); +} + +static void *rrd_functions_worker_globals_worker_main(void *arg) { + struct functions_evloop_globals *wg = arg; + + nd_thread_register_canceller(rrd_functions_worker_canceller, wg); + + bool last_acquired = true; + while (true) { + pthread_mutex_lock(&wg->worker_mutex); + + if(wg->workers_exit || nd_thread_signaled_to_cancel()) { + pthread_mutex_unlock(&wg->worker_mutex); + break; + } + + if(dictionary_entries(wg->worker_queue) == 0 || !last_acquired) + pthread_cond_wait(&wg->worker_cond_var, &wg->worker_mutex); + + const DICTIONARY_ITEM *acquired = NULL; + struct functions_evloop_worker_job *j; + dfe_start_write(wg->worker_queue, j) { + if(j->running || j->cancelled) + continue; + + acquired = dictionary_acquired_item_dup(wg->worker_queue, j_dfe.item); + j->running = true; + break; + } + dfe_done(j); + + pthread_mutex_unlock(&wg->worker_mutex); + + if(wg->workers_exit || nd_thread_signaled_to_cancel()) { + if(acquired) + dictionary_acquired_item_release(wg->worker_queue, acquired); + + break; + } + + if(acquired) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_REQUEST, j->cmd), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + last_acquired = true; + j = dictionary_acquired_item_value(acquired); + j->cb(j->transaction, j->cmd, &j->stop_monotonic_ut, &j->cancelled, j->payload, j->access, j->source, j->cb_data); + dictionary_del(wg->worker_queue, j->transaction); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_garbage_collect(wg->worker_queue); + } + else + last_acquired = false; + } + + return NULL; +} + +static void worker_add_job(struct functions_evloop_globals *wg, const char *keyword, char *transaction, char *function, char *timeout_s, BUFFER *payload, const char *access, const char *source) { + if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { + nd_log(NDLS_COLLECTORS, NDLP_ERR, "Received incomplete %s (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", + keyword, + transaction?transaction:"(unset)", + timeout_s?timeout_s:"(unset)", + function?function:"(unset)"); + } + else { + int timeout = str2i(timeout_s); + + const char *msg = "No function with this name found"; + bool found = false; + struct rrd_functions_expectation *we; + for(we = wg->expectations; we ;we = we->next) { + if(strncmp(function, we->function, we->function_length) == 0) { + if(timeout <= 0) + timeout = (int)we->default_timeout; + + struct functions_evloop_worker_job t = { + .cmd = strdupz(function), + .transaction = strdupz(transaction), + .running = false, + .cancelled = false, + .timeout = timeout, + .stop_monotonic_ut = now_monotonic_usec() + (timeout * USEC_PER_SEC), + .used = false, + .payload = buffer_dup(payload), + .access = http_access_from_hex(access), + .source = source ? strdupz(source) : NULL, + .cb = we->cb, + .cb_data = we->cb_data, + }; + struct functions_evloop_worker_job *j = dictionary_set(wg->worker_queue, transaction, &t, sizeof(t)); + if(j->used) { + nd_log(NDLS_COLLECTORS, NDLP_WARNING, "Received duplicate function transaction '%s'. Ignoring it.", transaction); + worker_job_cleanup(&t); + msg = "Duplicate function transaction. Ignoring it."; + } + else { + found = true; + j->used = true; + pthread_mutex_lock(&wg->worker_mutex); + pthread_cond_signal(&wg->worker_cond_var); + pthread_mutex_unlock(&wg->worker_mutex); + } + } + } + + if(!found) { + netdata_mutex_lock(wg->stdout_mutex); + pluginsd_function_json_error_to_stdout(transaction, HTTP_RESP_NOT_FOUND, msg); + netdata_mutex_unlock(wg->stdout_mutex); + } + } +} + +static void *rrd_functions_worker_globals_reader_main(void *arg) { + struct functions_evloop_globals *wg = arg; + + struct { + size_t last_len; // to remember the last pos - do not use a pointer, the buffer may realloc... + bool enabled; + char *transaction; + char *function; + char *timeout_s; + char *access; + char *source; + char *content_type; + } deferred = { 0 }; + + struct buffered_reader reader = { 0 }; + buffered_reader_init(&reader); + BUFFER *buffer = buffer_create(sizeof(reader.read_buffer) + 2, NULL); + + while(!(*wg->plugin_should_exit)) { + if(unlikely(!buffered_reader_next_line(&reader, buffer))) { + buffered_reader_ret_t ret = buffered_reader_read_timeout( + &reader, + fileno((FILE *)stdin), + 2 * 60 * MSEC_PER_SEC, + false + ); + + if(unlikely(ret != BUFFERED_READER_READ_OK && ret != BUFFERED_READER_READ_POLL_TIMEOUT)) + break; + + continue; + } + + if(deferred.enabled) { + char *s = (char *)buffer_tostring(buffer); + + if(strstr(&s[deferred.last_len], PLUGINSD_CALL_FUNCTION_PAYLOAD_END "\n") != NULL) { + if(deferred.last_len > 0) + // remove the trailing newline from the buffer + deferred.last_len--; + + s[deferred.last_len] = '\0'; + buffer->len = deferred.last_len; + buffer->content_type = content_type_string2id(deferred.content_type); + worker_add_job(wg, + PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN, deferred.transaction, deferred.function, + deferred.timeout_s, buffer, deferred.access, deferred.source); + buffer_flush(buffer); + + freez(deferred.transaction); + freez(deferred.function); + freez(deferred.timeout_s); + freez(deferred.access); + freez(deferred.source); + freez(deferred.content_type); + memset(&deferred, 0, sizeof(deferred)); + } + else + deferred.last_len = buffer->len; + + continue; + } + + char *words[MAX_FUNCTION_PARAMETERS] = { NULL }; + size_t num_words = quoted_strings_splitter_pluginsd((char *)buffer_tostring(buffer), words, MAX_FUNCTION_PARAMETERS); + + const char *keyword = get_word(words, num_words, 0); + + if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION) == 0)) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + char *access = get_word(words, num_words, 4); + char *source = get_word(words, num_words, 5); + worker_add_job(wg, keyword, transaction, function, timeout_s, NULL, access, source); + } + else if(keyword && (strcmp(keyword, PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN) == 0)) { + char *transaction = get_word(words, num_words, 1); + char *timeout_s = get_word(words, num_words, 2); + char *function = get_word(words, num_words, 3); + char *access = get_word(words, num_words, 4); + char *source = get_word(words, num_words, 5); + char *content_type = get_word(words, num_words, 6); + + deferred.transaction = strdupz(transaction ? transaction : ""); + deferred.timeout_s = strdupz(timeout_s ? timeout_s : ""); + deferred.function = strdupz(function ? function : ""); + deferred.access = strdupz(access ? access : ""); + deferred.source = strdupz(source ? source : ""); + deferred.content_type = strdupz(content_type ? content_type : ""); + deferred.last_len = 0; + deferred.enabled = true; + } + else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_CANCEL) == 0) { + char *transaction = get_word(words, num_words, 1); + const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); + if(acquired) { + struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); + __atomic_store_n(&j->cancelled, true, __ATOMIC_RELAXED); + dictionary_acquired_item_release(wg->worker_queue, acquired); + dictionary_del(wg->worker_queue, transaction); + dictionary_garbage_collect(wg->worker_queue); + } + else + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received CANCEL for transaction '%s', but it not available here", transaction); + } + else if(keyword && strcmp(keyword, PLUGINSD_CALL_FUNCTION_PROGRESS) == 0) { + char *transaction = get_word(words, num_words, 1); + const DICTIONARY_ITEM *acquired = dictionary_get_and_acquire_item(wg->worker_queue, transaction); + if(acquired) { + struct functions_evloop_worker_job *j = dictionary_acquired_item_value(acquired); + + functions_stop_monotonic_update_on_progress(&j->stop_monotonic_ut); + + dictionary_acquired_item_release(wg->worker_queue, acquired); + } + else + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received PROGRESS for transaction '%s', but it not available here", transaction); + } + else + nd_log(NDLS_COLLECTORS, NDLP_NOTICE, "Received unknown command: %s", keyword?keyword:"(unset)"); + + buffer_flush(buffer); + } + + if(!(*wg->plugin_should_exit)) + nd_log(NDLS_COLLECTORS, NDLP_ERR, "Read error on stdin"); + + *wg->plugin_should_exit = true; + exit(1); +} + +void worker_queue_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct functions_evloop_worker_job *j = value; + worker_job_cleanup(j); +} + +struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit) { + struct functions_evloop_globals *wg = callocz(1, sizeof(struct functions_evloop_globals)); + + wg->worker_queue = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE); + dictionary_register_delete_callback(wg->worker_queue, worker_queue_delete_cb, NULL); + + wg->dyncfg.nodes = dyncfg_nodes_dictionary_create(); + + pthread_mutex_init(&wg->worker_mutex, NULL); + pthread_cond_init(&wg->worker_cond_var, NULL); + + wg->plugin_should_exit = plugin_should_exit; + wg->stdout_mutex = stdout_mutex; + wg->workers = worker_threads; + wg->worker_threads = callocz(wg->workers, sizeof(ND_THREAD *)); + wg->tag = tag; + + char tag_buffer[NETDATA_THREAD_TAG_MAX + 1]; + snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_READER", wg->tag); + wg->reader_thread = nd_thread_create(tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, + rrd_functions_worker_globals_reader_main, wg); + + for(size_t i = 0; i < wg->workers ; i++) { + snprintfz(tag_buffer, NETDATA_THREAD_TAG_MAX, "%s_WORK[%zu]", wg->tag, i+1); + wg->worker_threads[i] = nd_thread_create(tag_buffer, NETDATA_THREAD_OPTION_DONT_LOG, + rrd_functions_worker_globals_worker_main, wg); + } + + functions_evloop_add_function(wg, "config", functions_evloop_config_cb, 120, wg); + + return wg; +} + +void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data) { + struct rrd_functions_expectation *we = callocz(1, sizeof(*we)); + we->function = function; + we->function_length = strlen(we->function); + we->cb = cb; + we->cb_data = data; + we->default_timeout = default_timeout; + DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wg->expectations, we, prev, next); +} + +void functions_evloop_cancel_threads(struct functions_evloop_globals *wg) { + nd_thread_signal_cancel(wg->reader_thread); + + for(size_t i = 0; i < wg->workers ; i++) + nd_thread_signal_cancel(wg->worker_threads[i]); +} + +// ---------------------------------------------------------------------------- + +static void functions_evloop_config_cb(const char *transaction, char *function, usec_t *stop_monotonic_ut, bool *cancelled, + BUFFER *payload, HTTP_ACCESS access, const char *source, void *data) { + struct functions_evloop_globals *wg = data; + + CLEAN_BUFFER *result = buffer_create(1024, NULL); + int code = dyncfg_node_find_and_call(wg->dyncfg.nodes, transaction, function, stop_monotonic_ut, + cancelled, payload, access, source, result); + + netdata_mutex_lock(wg->stdout_mutex); + pluginsd_function_result_begin_to_stdout(transaction, code, content_type_id2string(result->content_type), result->expires); + printf("%s", buffer_tostring(result)); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); + netdata_mutex_unlock(wg->stdout_mutex); +} + +void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path, + DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, + const char *source, DYNCFG_CMDS cmds, + HTTP_ACCESS view_access, HTTP_ACCESS edit_access, + dyncfg_cb_t cb, void *data) { + + if(!dyncfg_is_valid_id(id)) { + nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id); + return; + } + + struct dyncfg_node tmp = { + .cmds = cmds, + .type = type, + .cb = cb, + .data = data, + }; + dictionary_set(wg->dyncfg.nodes, id, &tmp, sizeof(tmp)); + + CLEAN_BUFFER *c = buffer_create(100, NULL); + dyncfg_cmds2buffer(cmds, c); + + netdata_mutex_lock(wg->stdout_mutex); + + fprintf(stdout, + PLUGINSD_KEYWORD_CONFIG " '%s' " PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE " '%s' '%s' '%s' '%s' '%s' '%s' "HTTP_ACCESS_FORMAT" "HTTP_ACCESS_FORMAT"\n", + id, + dyncfg_id2status(status), + dyncfg_id2type(type), path, + dyncfg_id2source_type(source_type), + source, + buffer_tostring(c), + (HTTP_ACCESS_FORMAT_CAST)view_access, + (HTTP_ACCESS_FORMAT_CAST)edit_access + ); + fflush(stdout); + + netdata_mutex_unlock(wg->stdout_mutex); +} + +void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id) { + if(!dyncfg_is_valid_id(id)) { + nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id); + return; + } + + dictionary_del(wg->dyncfg.nodes, id); + + netdata_mutex_lock(wg->stdout_mutex); + + fprintf(stdout, + PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "\n", + id); + fflush(stdout); + + netdata_mutex_unlock(wg->stdout_mutex); +} + +void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status) { + if(!dyncfg_is_valid_id(id)) { + nd_log(NDLS_COLLECTORS, NDLP_ERR, "DYNCFG: id '%s' is invalid. Ignoring dynamic configuration for it.", id); + return; + } + + netdata_mutex_lock(wg->stdout_mutex); + + fprintf(stdout, + PLUGINSD_KEYWORD_CONFIG " %s " PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS " %s\n", + id, dyncfg_id2status(status)); + + fflush(stdout); + + netdata_mutex_unlock(wg->stdout_mutex); +} diff --git a/src/libnetdata/functions_evloop/functions_evloop.h b/src/libnetdata/functions_evloop/functions_evloop.h new file mode 100644 index 000000000..5c575bd17 --- /dev/null +++ b/src/libnetdata/functions_evloop/functions_evloop.h @@ -0,0 +1,156 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_FUNCTIONS_EVLOOP_H +#define NETDATA_FUNCTIONS_EVLOOP_H + +#include "../libnetdata.h" + +#define MAX_FUNCTION_PARAMETERS 1024 +#define PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT 10 // seconds + +// plugins.d 1st version of the external plugins and streaming protocol +#define PLUGINSD_KEYWORD_CHART "CHART" +#define PLUGINSD_KEYWORD_CHART_DEFINITION_END "CHART_DEFINITION_END" +#define PLUGINSD_KEYWORD_DIMENSION "DIMENSION" +#define PLUGINSD_KEYWORD_BEGIN "BEGIN" +#define PLUGINSD_KEYWORD_SET "SET" +#define PLUGINSD_KEYWORD_END "END" +#define PLUGINSD_KEYWORD_FLUSH "FLUSH" +#define PLUGINSD_KEYWORD_DISABLE "DISABLE" +#define PLUGINSD_KEYWORD_VARIABLE "VARIABLE" +#define PLUGINSD_KEYWORD_LABEL "LABEL" +#define PLUGINSD_KEYWORD_OVERWRITE "OVERWRITE" +#define PLUGINSD_KEYWORD_CLABEL "CLABEL" +#define PLUGINSD_KEYWORD_CLABEL_COMMIT "CLABEL_COMMIT" +#define PLUGINSD_KEYWORD_EXIT "EXIT" + +// high-speed versions of BEGIN, SET, END +#define PLUGINSD_KEYWORD_BEGIN_V2 "BEGIN2" +#define PLUGINSD_KEYWORD_SET_V2 "SET2" +#define PLUGINSD_KEYWORD_END_V2 "END2" + +// super high-speed versions of BEGIN, SET, END have this as first parameter +// enabled with the streaming capability STREAM_CAP_SLOTS +#define PLUGINSD_KEYWORD_SLOT "SLOT" // to change the length of this, update pluginsd_extract_chart_slot() too + +// virtual hosts (only for external plugins - for streaming virtual hosts are like all other hosts) +#define PLUGINSD_KEYWORD_HOST_DEFINE "HOST_DEFINE" +#define PLUGINSD_KEYWORD_HOST_DEFINE_END "HOST_DEFINE_END" +#define PLUGINSD_KEYWORD_HOST_LABEL "HOST_LABEL" +#define PLUGINSD_KEYWORD_HOST "HOST" + +// replication +// enabled with STREAM_CAP_REPLICATION +#define PLUGINSD_KEYWORD_REPLAY_CHART "REPLAY_CHART" +#define PLUGINSD_KEYWORD_REPLAY_BEGIN "RBEGIN" +#define PLUGINSD_KEYWORD_REPLAY_SET "RSET" +#define PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE "RDSTATE" +#define PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE "RSSTATE" +#define PLUGINSD_KEYWORD_REPLAY_END "REND" + +// plugins.d accepts these for functions (from external plugins or streaming children) +// related to STREAM_CAP_FUNCTIONS, STREAM_CAP_PROGRESS +#define PLUGINSD_KEYWORD_FUNCTION "FUNCTION" // define a function +#define PLUGINSD_KEYWORD_FUNCTION_PROGRESS "FUNCTION_PROGRESS" // send updates about function progress +#define PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN "FUNCTION_RESULT_BEGIN" // the result of a function transaction +#define PLUGINSD_KEYWORD_FUNCTION_RESULT_END "FUNCTION_RESULT_END" // the end of the result of a func. trans. + +// plugins.d sends these for functions (to external plugins or streaming children) +// related to STREAM_CAP_FUNCTIONS, STREAM_CAP_PROGRESS +#define PLUGINSD_CALL_FUNCTION "FUNCTION" // call a function to a plugin or remote host +#define PLUGINSD_CALL_FUNCTION_PAYLOAD_BEGIN "FUNCTION_PAYLOAD" // call a function with a payload +#define PLUGINSD_CALL_FUNCTION_PAYLOAD_END "FUNCTION_PAYLOAD_END" // function payload ends +#define PLUGINSD_CALL_FUNCTION_CANCEL "FUNCTION_CANCEL" // cancel a running function transaction +#define PLUGINSD_CALL_FUNCTION_PROGRESS "FUNCTION_PROGRESS" // let the function know the user is waiting + +// dyncfg +// enabled with STREAM_CAP_DYNCFG +#define PLUGINSD_KEYWORD_CONFIG "CONFIG" +#define PLUGINSD_KEYWORD_CONFIG_ACTION_CREATE "create" +#define PLUGINSD_KEYWORD_CONFIG_ACTION_DELETE "delete" +#define PLUGINSD_KEYWORD_CONFIG_ACTION_STATUS "status" +#define PLUGINSD_FUNCTION_CONFIG "config" + +typedef void (*functions_evloop_worker_execute_t)(const char *transaction, char *function, usec_t *stop_monotonic_ut, + bool *cancelled, BUFFER *payload, HTTP_ACCESS access, + const char *source, void *data); + +struct functions_evloop_worker_job; +struct functions_evloop_globals *functions_evloop_init(size_t worker_threads, const char *tag, netdata_mutex_t *stdout_mutex, bool *plugin_should_exit); +void functions_evloop_add_function(struct functions_evloop_globals *wg, const char *function, functions_evloop_worker_execute_t cb, time_t default_timeout, void *data); +void functions_evloop_cancel_threads(struct functions_evloop_globals *wg); + +#define FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT (10 * USEC_PER_SEC) +static inline void functions_stop_monotonic_update_on_progress(usec_t *stop_monotonic_ut) { + usec_t now_ut = now_monotonic_usec(); + if(now_ut + FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT > *stop_monotonic_ut) { + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Extending function timeout due to PROGRESS update..."); + __atomic_store_n(stop_monotonic_ut, now_ut + FUNCTIONS_EXTENDED_TIME_ON_PROGRESS_UT, __ATOMIC_RELAXED); + } + else + nd_log(NDLS_DAEMON, NDLP_DEBUG, "Received PROGRESS update..."); +} + +#define pluginsd_function_result_begin_to_buffer(wb, transaction, code, content_type, expires) \ + buffer_sprintf(wb \ + , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ + , (transaction) ? (transaction) : "" \ + , (int)(code) \ + , (content_type) ? (content_type) : "" \ + , (long int)(expires) \ + ) + +#define pluginsd_function_result_end_to_buffer(wb) \ + buffer_strcat(wb, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") + +#define pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires) \ + fprintf(stdout \ + , PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " \"%s\" %d \"%s\" %ld\n" \ + , (transaction) ? (transaction) : "" \ + , (int)(code) \ + , (content_type) ? (content_type) : "" \ + , (long int)(expires) \ + ) + +#define pluginsd_function_result_end_to_stdout() \ + fprintf(stdout, "\n" PLUGINSD_KEYWORD_FUNCTION_RESULT_END "\n") + +static inline void pluginsd_function_json_error_to_stdout(const char *transaction, int code, const char *msg) { + char buffer[PLUGINSD_LINE_MAX + 1]; + json_escape_string(buffer, msg, PLUGINSD_LINE_MAX); + + pluginsd_function_result_begin_to_stdout(transaction, code, "application/json", now_realtime_sec()); + fprintf(stdout, "{\"status\":%d,\"error_message\":\"%s\"}", code, buffer); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); +} + +static inline void pluginsd_function_result_to_stdout(const char *transaction, int code, const char *content_type, time_t expires, BUFFER *result) { + pluginsd_function_result_begin_to_stdout(transaction, code, content_type, expires); + fwrite(buffer_tostring(result), buffer_strlen(result), 1, stdout); + pluginsd_function_result_end_to_stdout(); + fflush(stdout); +} + +static inline void pluginsd_function_progress_to_stdout(const char *transaction, size_t done, size_t all) { + fprintf(stdout, PLUGINSD_KEYWORD_FUNCTION_PROGRESS " '%s' %zu %zu\n", + transaction, done, all); + fflush(stdout); +} + +static inline void send_newline_and_flush(pthread_mutex_t *mutex) { + netdata_mutex_lock(mutex); + fprintf(stdout, "\n"); + fflush(stdout); + netdata_mutex_unlock(mutex); +} + +void functions_evloop_dyncfg_add(struct functions_evloop_globals *wg, const char *id, const char *path, + DYNCFG_STATUS status, DYNCFG_TYPE type, DYNCFG_SOURCE_TYPE source_type, const char *source, DYNCFG_CMDS cmds, + HTTP_ACCESS view_access, HTTP_ACCESS edit_access, + dyncfg_cb_t cb, void *data); + +void functions_evloop_dyncfg_del(struct functions_evloop_globals *wg, const char *id); +void functions_evloop_dyncfg_status(struct functions_evloop_globals *wg, const char *id, DYNCFG_STATUS status); + +#endif //NETDATA_FUNCTIONS_EVLOOP_H |