summaryrefslogtreecommitdiffstats
path: root/database/rrdfunctions.c
diff options
context:
space:
mode:
Diffstat (limited to 'database/rrdfunctions.c')
-rw-r--r--database/rrdfunctions.c755
1 files changed, 570 insertions, 185 deletions
diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c
index d32a4b8c9..6c5baf346 100644
--- a/database/rrdfunctions.c
+++ b/database/rrdfunctions.c
@@ -1,3 +1,4 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
#define NETDATA_RRD_INTERNALS
#include "rrd.h"
@@ -37,17 +38,17 @@ static unsigned char functions_allowed_chars[256] = {
[30] = '_', //
[31] = '_', //
[32] = ' ', // SPACE keep
- [33] = '_', // !
- [34] = '_', // "
- [35] = '_', // #
- [36] = '_', // $
- [37] = '_', // %
- [38] = '_', // &
- [39] = '_', // '
- [40] = '_', // (
- [41] = '_', // )
- [42] = '_', // *
- [43] = '_', // +
+ [33] = '!', // ! keep
+ [34] = '"', // " keep
+ [35] = '#', // # keep
+ [36] = '$', // $ keep
+ [37] = '%', // % keep
+ [38] = '&', // & keep
+ [39] = '\'', // ' keep
+ [40] = '(', // ( keep
+ [41] = ')', // ) keep
+ [42] = '*', // * keep
+ [43] = '+', // + keep
[44] = ',', // , keep
[45] = '-', // - keep
[46] = '.', // . keep
@@ -63,12 +64,12 @@ static unsigned char functions_allowed_chars[256] = {
[56] = '8', // 8 keep
[57] = '9', // 9 keep
[58] = ':', // : keep
- [59] = ':', // ; convert ; to :
- [60] = '_', // <
- [61] = ':', // = convert = to :
- [62] = '_', // >
- [63] = '_', // ?
- [64] = '_', // @
+ [59] = ';', // ; keep
+ [60] = '<', // < keep
+ [61] = '=', // = keep
+ [62] = '>', // > keep
+ [63] = '?', // ? keep
+ [64] = '@', // @ keep
[65] = 'A', // A keep
[66] = 'B', // B keep
[67] = 'C', // C keep
@@ -95,12 +96,12 @@ static unsigned char functions_allowed_chars[256] = {
[88] = 'X', // X keep
[89] = 'Y', // Y keep
[90] = 'Z', // Z keep
- [91] = '_', // [
- [92] = '/', // backslash convert \ to /
- [93] = '_', // ]
- [94] = '_', // ^
+ [91] = '[', // [ keep
+ [92] = '\\', // backslash keep
+ [93] = ']', // ] keep
+ [94] = '^', // ^ keep
[95] = '_', // _ keep
- [96] = '_', // `
+ [96] = '`', // ` keep
[97] = 'a', // a keep
[98] = 'b', // b keep
[99] = 'c', // c keep
@@ -127,10 +128,10 @@ static unsigned char functions_allowed_chars[256] = {
[120] = 'x', // x keep
[121] = 'y', // y keep
[122] = 'z', // z keep
- [123] = '_', // {
- [124] = '_', // |
- [125] = '_', // }
- [126] = '_', // ~
+ [123] = '{', // { keep
+ [124] = '|', // | keep
+ [125] = '}', // } keep
+ [126] = '~', // ~ keep
[127] = '_', //
[128] = '_', //
[129] = '_', //
@@ -277,16 +278,15 @@ typedef enum __attribute__((packed)) {
// this is 8-bit
} RRD_FUNCTION_OPTIONS;
-struct rrd_collector_function {
+struct rrd_host_function {
bool sync; // when true, the function is called synchronously
RRD_FUNCTION_OPTIONS options; // RRD_FUNCTION_OPTIONS
STRING *help;
int timeout; // the default timeout of the function
- int (*function)(BUFFER *wb, int timeout, const char *function, void *collector_data,
- function_data_ready_callback callback, void *callback_data);
+ rrd_function_execute_cb_t execute_cb;
- void *collector_data;
+ void *execute_cb_data;
struct rrd_collector *collector;
};
@@ -299,6 +299,7 @@ struct rrd_collector_function {
struct rrd_collector {
int32_t refcount;
+ int32_t refcount_canceller;
pid_t tid;
bool running;
};
@@ -310,8 +311,11 @@ struct rrd_collector {
static __thread struct rrd_collector *thread_rrd_collector = NULL;
static void rrd_collector_free(struct rrd_collector *rdc) {
+ if(rdc->running)
+ return;
+
int32_t expected = 0;
- if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) {
+ if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) {
// the collector is still referenced by charts.
// leave it hanging there, the last chart will actually free it.
return;
@@ -323,11 +327,11 @@ static void rrd_collector_free(struct rrd_collector *rdc) {
// called once per collector
void rrd_collector_started(void) {
- if(likely(thread_rrd_collector)) return;
+ if(!thread_rrd_collector)
+ thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
- thread_rrd_collector = callocz(1, sizeof(struct rrd_collector));
thread_rrd_collector->tid = gettid();
- thread_rrd_collector->running = true;
+ __atomic_store_n(&thread_rrd_collector->running, true, __ATOMIC_RELAXED);
}
// called once per collector
@@ -335,65 +339,110 @@ void rrd_collector_finished(void) {
if(!thread_rrd_collector)
return;
- thread_rrd_collector->running = false;
+ __atomic_store_n(&thread_rrd_collector->running, false, __ATOMIC_RELAXED);
+
+ int32_t expected = 0;
+ while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount_canceller, &expected, -1, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
+ expected = 0;
+ sleep_usec(1 * USEC_PER_MS);
+ }
+
rrd_collector_free(thread_rrd_collector);
thread_rrd_collector = NULL;
}
+#define rrd_collector_running(c) __atomic_load_n(&(c)->running, __ATOMIC_RELAXED)
+
static struct rrd_collector *rrd_collector_acquire(void) {
- __atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST);
+ rrd_collector_started();
+
+ int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0;
+ do {
+ if(expected < 0 || !rrd_collector_running(thread_rrd_collector)) {
+ internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting.");
+ return thread_rrd_collector;
+ }
+
+ wanted = expected + 1;
+
+ } while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED));
+
return thread_rrd_collector;
}
static void rrd_collector_release(struct rrd_collector *rdc) {
if(unlikely(!rdc)) return;
- int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST);
- if(refcount == 0 && !rdc->running)
+ int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0;
+ do {
+ if(expected < 0) {
+ internal_fatal(true, "FUNCTIONS: Trying to release a collector that is exiting.");
+ return;
+ }
+
+ if(expected == 0) {
+ internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired.");
+ return;
+ }
+
+ wanted = expected - 1;
+
+ } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED));
+
+ if(wanted == 0)
rrd_collector_free(rdc);
}
-static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
- void *rrdhost __maybe_unused) {
- struct rrd_collector_function *rdcf = func;
-
- if(!thread_rrd_collector)
- fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
- __FUNCTION__, dictionary_acquired_item_name(item));
+static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) {
+ RRDHOST *host = rrdhost; (void)host;
+ struct rrd_host_function *rdcf = func;
+ rrd_collector_started();
rdcf->collector = rrd_collector_acquire();
+
+// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s",
+// dictionary_acquired_item_name(item), rrdhost_hostname(host),
+// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running");
}
-static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
+static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
void *rrdhost __maybe_unused) {
- struct rrd_collector_function *rdcf = func;
+ struct rrd_host_function *rdcf = func;
rrd_collector_release(rdcf->collector);
}
-static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused,
- void *new_func __maybe_unused, void *rrdhost __maybe_unused) {
- struct rrd_collector_function *rdcf = func;
- struct rrd_collector_function *new_rdcf = new_func;
+static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func,
+ void *new_func, void *rrdhost) {
+ RRDHOST *host = rrdhost; (void)host;
+ struct rrd_host_function *rdcf = func;
+ struct rrd_host_function *new_rdcf = new_func;
- if(!thread_rrd_collector)
- fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.",
- __FUNCTION__, dictionary_acquired_item_name(item));
+ rrd_collector_started();
bool changed = false;
if(rdcf->collector != thread_rrd_collector) {
+ netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed collector from %d to %d",
+ dictionary_acquired_item_name(item), rrdhost_hostname(host), rdcf->collector->tid, thread_rrd_collector->tid);
+
struct rrd_collector *old_rdc = rdcf->collector;
rdcf->collector = rrd_collector_acquire();
rrd_collector_release(old_rdc);
changed = true;
}
- if(rdcf->function != new_rdcf->function) {
- rdcf->function = new_rdcf->function;
+ if(rdcf->execute_cb != new_rdcf->execute_cb) {
+ netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback",
+ dictionary_acquired_item_name(item), rrdhost_hostname(host));
+
+ rdcf->execute_cb = new_rdcf->execute_cb;
changed = true;
}
if(rdcf->help != new_rdcf->help) {
+ netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed help text",
+ dictionary_acquired_item_name(item), rrdhost_hostname(host));
+
STRING *old = rdcf->help;
rdcf->help = new_rdcf->help;
string_freez(old);
@@ -403,41 +452,53 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_
string_freez(new_rdcf->help);
if(rdcf->timeout != new_rdcf->timeout) {
+ netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed timeout",
+ dictionary_acquired_item_name(item), rrdhost_hostname(host));
+
rdcf->timeout = new_rdcf->timeout;
changed = true;
}
if(rdcf->sync != new_rdcf->sync) {
+ netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed sync/async mode",
+ dictionary_acquired_item_name(item), rrdhost_hostname(host));
+
rdcf->sync = new_rdcf->sync;
changed = true;
}
- if(rdcf->collector_data != new_rdcf->collector_data) {
- rdcf->collector_data = new_rdcf->collector_data;
+ if(rdcf->execute_cb_data != new_rdcf->execute_cb_data) {
+ netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback data",
+ dictionary_acquired_item_name(item), rrdhost_hostname(host));
+
+ rdcf->execute_cb_data = new_rdcf->execute_cb_data;
changed = true;
}
+// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s",
+// dictionary_acquired_item_name(item), rrdhost_hostname(host),
+// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running");
+
return changed;
}
-
-void rrdfunctions_init(RRDHOST *host) {
+void rrdfunctions_host_init(RRDHOST *host) {
if(host->functions) return;
host->functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
- &dictionary_stats_category_functions, sizeof(struct rrd_collector_function));
+ &dictionary_stats_category_functions, sizeof(struct rrd_host_function));
dictionary_register_insert_callback(host->functions, rrd_functions_insert_callback, host);
dictionary_register_delete_callback(host->functions, rrd_functions_delete_callback, host);
dictionary_register_conflict_callback(host->functions, rrd_functions_conflict_callback, host);
}
-void rrdfunctions_destroy(RRDHOST *host) {
+void rrdfunctions_host_destroy(RRDHOST *host) {
dictionary_destroy(host->functions);
}
-void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help,
- bool sync, function_execute_at_collector function, void *collector_data) {
+void rrd_function_add(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help,
+ bool sync, rrd_function_execute_cb_t execute_cb, void *execute_cb_data) {
// RRDSET *st may be NULL in this function
// to create a GLOBAL function
@@ -448,18 +509,20 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int
char key[PLUGINSD_LINE_MAX + 1];
sanitize_function_text(key, name, PLUGINSD_LINE_MAX);
- struct rrd_collector_function tmp = {
+ struct rrd_host_function tmp = {
.sync = sync,
.timeout = timeout,
.options = (st)?RRD_FUNCTION_LOCAL:RRD_FUNCTION_GLOBAL,
- .function = function,
- .collector_data = collector_data,
+ .execute_cb = execute_cb,
+ .execute_cb_data = execute_cb_data,
.help = string_strdupz(help),
};
const DICTIONARY_ITEM *item = dictionary_set_and_acquire_item(host->functions, key, &tmp, sizeof(tmp));
if(st)
dictionary_view_set(st->functions_view, key, item);
+ else
+ rrdhost_flag_set(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);
dictionary_acquired_item_release(host->functions, item);
}
@@ -468,7 +531,7 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
if(!st->functions_view)
return;
- struct rrd_collector_function *tmp;
+ struct rrd_host_function *tmp;
dfe_start_read(st->functions_view, tmp) {
buffer_sprintf(wb
, PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n"
@@ -481,7 +544,9 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) {
}
void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) {
- struct rrd_collector_function *tmp;
+ rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED);
+
+ struct rrd_host_function *tmp;
dfe_start_read(host->functions, tmp) {
if(!(tmp->options & RRD_FUNCTION_GLOBAL))
continue;
@@ -496,20 +561,6 @@ void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) {
dfe_done(tmp);
}
-struct rrd_function_call_wait {
- bool free_with_signal;
- bool data_are_ready;
- netdata_mutex_t mutex;
- pthread_cond_t cond;
- int code;
-};
-
-static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) {
- pthread_cond_destroy(&tmp->cond);
- netdata_mutex_destroy(&tmp->mutex);
- freez(tmp);
-}
-
struct {
const char *format;
HTTP_CONTENT_TYPE content_type;
@@ -558,41 +609,171 @@ int rrd_call_function_error(BUFFER *wb, const char *msg, int code) {
return code;
}
-static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, struct rrd_collector_function **rdcf) {
+static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, const DICTIONARY_ITEM **item) {
char buffer[MAX_FUNCTION_LENGTH + 1];
strncpyz(buffer, name, MAX_FUNCTION_LENGTH);
char *s = NULL;
- *rdcf = NULL;
- while(!(*rdcf) && buffer[0]) {
- *rdcf = dictionary_get(host->functions, buffer);
- if(*rdcf) break;
+ bool found = false;
+ *item = NULL;
+ if(host->functions) {
+ while (buffer[0]) {
+ if((*item = dictionary_get_and_acquire_item(host->functions, buffer))) {
+ found = true;
+
+ struct rrd_host_function *rdcf = dictionary_acquired_item_value(*item);
+ if(rrd_collector_running(rdcf->collector)) {
+ break;
+ }
+ else {
+ dictionary_acquired_item_release(host->functions, *item);
+ *item = NULL;
+ }
+ }
- // if s == NULL, set it to the end of the buffer
- // this should happen only the first time
- if(unlikely(!s))
- s = &buffer[key_length - 1];
+ // if s == NULL, set it to the end of the buffer
+ // this should happen only the first time
+ if (unlikely(!s))
+ s = &buffer[key_length - 1];
- // skip a word from the end
- while(s >= buffer && !isspace(*s)) *s-- = '\0';
+ // skip a word from the end
+ while (s >= buffer && !isspace(*s)) *s-- = '\0';
- // skip all spaces
- while(s >= buffer && isspace(*s)) *s-- = '\0';
+ // skip all spaces
+ while (s >= buffer && isspace(*s)) *s-- = '\0';
+ }
}
buffer_flush(wb);
- if(!(*rdcf))
- return rrd_call_function_error(wb, "No collector is supplying this function on this host at this time.", HTTP_RESP_NOT_FOUND);
-
- if(!(*rdcf)->collector->running)
- return rrd_call_function_error(wb, "The collector that registered this function, is not currently running.", HTTP_RESP_BACKEND_FETCH_FAILED);
+ if(!(*item)) {
+ if(found)
+ return rrd_call_function_error(wb,
+ "The collector that registered this function, is not currently running.",
+ HTTP_RESP_SERVICE_UNAVAILABLE);
+ else
+ return rrd_call_function_error(wb,
+ "No collector is supplying this function on this host at this time.",
+ HTTP_RESP_NOT_FOUND);
+ }
return HTTP_RESP_OK;
}
-static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) {
+// ----------------------------------------------------------------------------
+
+struct rrd_function_inflight {
+ bool used;
+
+ RRDHOST *host;
+ const char *transaction;
+ const char *cmd;
+ const char *sanitized_cmd;
+ size_t sanitized_cmd_length;
+ int timeout;
+ bool cancelled;
+
+ const DICTIONARY_ITEM *host_function_acquired;
+
+ // the collector
+ // we acquire this structure at the beginning,
+ // and we release it at the end
+ struct rrd_host_function *rdcf;
+
+ struct {
+ BUFFER *wb;
+
+ // in async mode,
+ // the function to call to send the result back
+ rrd_function_result_callback_t cb;
+ void *data;
+ } result;
+
+ struct {
+ // to be called in sync mode
+ // while the function is running
+ // to check if the function has been cancelled
+ rrd_function_is_cancelled_cb_t cb;
+ void *data;
+ } is_cancelled;
+
+ struct {
+ // to be registered by the function itself
+ // used to signal the function to cancel
+ rrd_function_canceller_cb_t cb;
+ void *data;
+ } canceller;
+};
+
+static DICTIONARY *rrd_functions_inflight_requests = NULL;
+
+static void rrd_functions_inflight_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) {
+ struct rrd_function_inflight *r = value;
+
+ // internal_error(true, "FUNCTIONS: transaction '%s' finished", r->transaction);
+
+ freez((void *)r->transaction);
+ freez((void *)r->cmd);
+ freez((void *)r->sanitized_cmd);
+ dictionary_acquired_item_release(r->host->functions, r->host_function_acquired);
+}
+
+void rrd_functions_inflight_init(void) {
+ if(rrd_functions_inflight_requests)
+ return;
+
+ rrd_functions_inflight_requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrd_function_inflight));
+
+ dictionary_register_delete_callback(rrd_functions_inflight_requests, rrd_functions_inflight_delete_cb, NULL);
+}
+
+void rrd_functions_inflight_destroy(void) {
+ if(!rrd_functions_inflight_requests)
+ return;
+
+ dictionary_destroy(rrd_functions_inflight_requests);
+ rrd_functions_inflight_requests = NULL;
+}
+
+static void rrd_inflight_async_function_register_canceller_cb(void *register_canceller_cb_data, rrd_function_canceller_cb_t canceller_cb, void *canceller_cb_data) {
+ struct rrd_function_inflight *r = register_canceller_cb_data;
+ r->canceller.cb = canceller_cb;
+ r->canceller.data = canceller_cb_data;
+}
+
+// ----------------------------------------------------------------------------
+// waiting for async function completion
+
+struct rrd_function_call_wait {
+ RRDHOST *host;
+ const DICTIONARY_ITEM *host_function_acquired;
+ char *transaction;
+
+ bool free_with_signal;
+ bool data_are_ready;
+ netdata_mutex_t mutex;
+ pthread_cond_t cond;
+ int code;
+};
+
+static void rrd_inflight_function_cleanup(RRDHOST *host __maybe_unused,
+ const DICTIONARY_ITEM *host_function_acquired __maybe_unused,
+ const char *transaction) {
+ dictionary_del(rrd_functions_inflight_requests, transaction);
+ dictionary_garbage_collect(rrd_functions_inflight_requests);
+}
+
+static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) {
+ rrd_inflight_function_cleanup(tmp->host, tmp->host_function_acquired, tmp->transaction);
+ freez(tmp->transaction);
+
+ pthread_cond_destroy(&tmp->cond);
+ netdata_mutex_destroy(&tmp->mutex);
+ freez(tmp);
+}
+
+static void rrd_async_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) {
struct rrd_function_call_wait *tmp = callback_data;
bool we_should_free = false;
@@ -618,115 +799,308 @@ static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused,
}
}
-int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name) {
- int code;
+static void rrd_inflight_async_function_nowait_finished(BUFFER *wb, int code, void *data) {
+ struct rrd_function_inflight *r = data;
- struct rrd_collector_function *rdcf = NULL;
+ if(r->result.cb)
+ r->result.cb(wb, code, r->result.data);
- char key[PLUGINSD_LINE_MAX + 1];
- size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX);
- code = rrd_call_function_find(host, wb, key, key_length, &rdcf);
- if(code != HTTP_RESP_OK)
- return code;
+ rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction);
+}
- if(timeout <= 0)
- timeout = rdcf->timeout;
+static bool rrd_inflight_async_function_is_cancelled(void *data) {
+ struct rrd_function_inflight *r = data;
+ return __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED);
+}
- struct timespec tp;
- clock_gettime(CLOCK_REALTIME, &tp);
- tp.tv_sec += (time_t)timeout;
+static inline int rrd_call_function_async_and_dont_wait(struct rrd_function_inflight *r) {
+ int code = r->rdcf->execute_cb(r->result.wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data,
+ rrd_inflight_async_function_nowait_finished, r,
+ rrd_inflight_async_function_is_cancelled, r,
+ rrd_inflight_async_function_register_canceller_cb, r);
- if(rdcf->sync) {
- code = rdcf->function(wb, timeout, key, rdcf->collector_data, NULL, NULL);
+ if(code != HTTP_RESP_OK) {
+ if (!buffer_strlen(r->result.wb))
+ rrd_call_function_error(r->result.wb, "Failed to send request to the collector.", code);
+
+ rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction);
}
- else {
- struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait));
- tmp->free_with_signal = false;
- tmp->data_are_ready = false;
- netdata_mutex_init(&tmp->mutex);
- pthread_cond_init(&tmp->cond, NULL);
-
- bool we_should_free = true;
- BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it
- temp_wb->content_type = wb->content_type;
- code = rdcf->function(temp_wb, timeout, key, rdcf->collector_data, rrd_call_function_signal_when_ready, tmp);
- if (code == HTTP_RESP_OK) {
- netdata_mutex_lock(&tmp->mutex);
-
- int rc = 0;
- while (rc == 0 && !tmp->data_are_ready) {
- // the mutex is unlocked within pthread_cond_timedwait()
- rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp);
- // the mutex is again ours
- }
- if (tmp->data_are_ready) {
- // we have a response
- buffer_fast_strcat(wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb));
- wb->content_type = temp_wb->content_type;
- wb->expires = temp_wb->expires;
+ return code;
+}
- if(wb->expires)
- buffer_cacheable(wb);
- else
- buffer_no_cacheable(wb);
+static int rrd_call_function_async_and_wait(struct rrd_function_inflight *r) {
+ struct timespec tp;
+ clock_gettime(CLOCK_REALTIME, &tp);
+ usec_t now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC;
+ usec_t end_ut = now_ut + r->timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT;
+
+ struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait));
+ tmp->free_with_signal = false;
+ tmp->data_are_ready = false;
+ tmp->host = r->host;
+ tmp->host_function_acquired = r->host_function_acquired;
+ tmp->transaction = strdupz(r->transaction);
+ netdata_mutex_init(&tmp->mutex);
+ pthread_cond_init(&tmp->cond, NULL);
+
+ // we need a temporary BUFFER, because we may time out and the caller supplied one may vanish
+ // so, we create a new one we guarantee will survive until the collector finishes...
+
+ bool we_should_free = true;
+ BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it
+ temp_wb->content_type = r->result.wb->content_type;
+
+ int code = r->rdcf->execute_cb(temp_wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data,
+ // we overwrite the result callbacks,
+ // so that we can clean up the allocations made
+ rrd_async_function_signal_when_ready, tmp,
+ rrd_inflight_async_function_is_cancelled, r,
+ rrd_inflight_async_function_register_canceller_cb, r);
+
+ if (code == HTTP_RESP_OK) {
+ netdata_mutex_lock(&tmp->mutex);
+
+ bool cancelled = false;
+ int rc = 0;
+ while (rc == 0 && !cancelled && !tmp->data_are_ready) {
+ clock_gettime(CLOCK_REALTIME, &tp);
+ now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC;
+
+ if(now_ut >= end_ut) {
+ rc = ETIMEDOUT;
+ break;
+ }
- code = tmp->code;
+ tp.tv_nsec += 10 * NSEC_PER_MSEC;
+ if(tp.tv_nsec > (long)(1 * NSEC_PER_SEC)) {
+ tp.tv_sec++;
+ tp.tv_nsec -= 1 * NSEC_PER_SEC;
}
- else if (rc == ETIMEDOUT) {
- // timeout
- // we will go away and let the callback free the structure
- tmp->free_with_signal = true;
- we_should_free = false;
- code = rrd_call_function_error(wb, "Timeout while waiting for a response from the collector.", HTTP_RESP_GATEWAY_TIMEOUT);
+
+ // the mutex is unlocked within pthread_cond_timedwait()
+ rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp);
+ // the mutex is again ours
+
+ if(rc == ETIMEDOUT) {
+ rc = 0;
+ if (!tmp->data_are_ready && r->is_cancelled.cb &&
+ r->is_cancelled.cb(r->is_cancelled.data)) {
+// internal_error(true, "FUNCTIONS: transaction '%s' is cancelled while waiting for response",
+// r->transaction);
+ rc = 0;
+ cancelled = true;
+ rrd_function_cancel(r->transaction);
+ break;
+ }
}
+ }
+
+ if (tmp->data_are_ready) {
+ // we have a response
+ buffer_fast_strcat(r->result.wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb));
+ r->result.wb->content_type = temp_wb->content_type;
+ r->result.wb->expires = temp_wb->expires;
+
+ if(r->result.wb->expires)
+ buffer_cacheable(r->result.wb);
else
- code = rrd_call_function_error(wb, "Failed to get the response from the collector.", HTTP_RESP_INTERNAL_SERVER_ERROR);
+ buffer_no_cacheable(r->result.wb);
- netdata_mutex_unlock(&tmp->mutex);
+ code = tmp->code;
}
- else {
- if(!buffer_strlen(wb))
- rrd_call_function_error(wb, "Failed to send request to the collector.", code);
+ else if (rc == ETIMEDOUT || cancelled) {
+ // timeout
+ // we will go away and let the callback free the structure
+ tmp->free_with_signal = true;
+ we_should_free = false;
+
+ if(cancelled)
+ code = rrd_call_function_error(r->result.wb,
+ "Request cancelled",
+ HTTP_RESP_CLIENT_CLOSED_REQUEST);
+ else
+ code = rrd_call_function_error(r->result.wb,
+ "Timeout while waiting for a response from the collector.",
+ HTTP_RESP_GATEWAY_TIMEOUT);
}
+ else
+ code = rrd_call_function_error(r->result.wb,
+ "Internal error while communicating with the collector",
+ HTTP_RESP_INTERNAL_SERVER_ERROR);
- if (we_should_free) {
- rrd_function_call_wait_free(tmp);
- buffer_free(temp_wb);
- }
+ netdata_mutex_unlock(&tmp->mutex);
+ }
+ else {
+ if(!buffer_strlen(r->result.wb))
+ rrd_call_function_error(r->result.wb, "The collector returned an error.", code);
+ }
+
+ if (we_should_free) {
+ rrd_function_call_wait_free(tmp);
+ buffer_free(temp_wb);
}
return code;
}
-int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name,
- rrd_call_function_async_callback callback, void *callback_data) {
+static inline int rrd_call_function_async(struct rrd_function_inflight *r, bool wait) {
+ if(wait)
+ return rrd_call_function_async_and_wait(r);
+ else
+ return rrd_call_function_async_and_dont_wait(r);
+}
+
+
+void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data);
+// ----------------------------------------------------------------------------
+
+int rrd_function_run(RRDHOST *host, BUFFER *result_wb, int timeout, const char *cmd,
+ bool wait, const char *transaction,
+ rrd_function_result_callback_t result_cb, void *result_cb_data,
+ rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data, const char *payload) {
+
int code;
+ char sanitized_cmd[PLUGINSD_LINE_MAX + 1];
+ const DICTIONARY_ITEM *host_function_acquired = NULL;
- struct rrd_collector_function *rdcf = NULL;
- char key[PLUGINSD_LINE_MAX + 1];
- size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX);
- code = rrd_call_function_find(host, wb, key, key_length, &rdcf);
+ // ------------------------------------------------------------------------
+ // find the function
+
+ size_t sanitized_cmd_length = sanitize_function_text(sanitized_cmd, cmd, PLUGINSD_LINE_MAX);
+
+ if (is_dyncfg_function(sanitized_cmd, DYNCFG_FUNCTION_TYPE_ALL)) {
+ call_virtual_function_async(result_wb, host, sanitized_cmd, payload, result_cb, result_cb_data);
+ return HTTP_RESP_OK;
+ }
+
+ code = rrd_call_function_find(host, result_wb, sanitized_cmd, sanitized_cmd_length, &host_function_acquired);
if(code != HTTP_RESP_OK)
return code;
+ struct rrd_host_function *rdcf = dictionary_acquired_item_value(host_function_acquired);
+
if(timeout <= 0)
timeout = rdcf->timeout;
- code = rdcf->function(wb, timeout, key, rdcf->collector_data, callback, callback_data);
- if(code != HTTP_RESP_OK) {
- if (!buffer_strlen(wb))
- rrd_call_function_error(wb, "Failed to send request to the collector.", code);
+ // ------------------------------------------------------------------------
+ // the function can only be executed in sync mode
+
+ if(rdcf->sync) {
+ // the caller has to wait
+
+ code = rdcf->execute_cb(result_wb, timeout, sanitized_cmd, rdcf->execute_cb_data,
+ NULL, NULL, // no callback needed, it is synchronous
+ is_cancelled_cb, is_cancelled_cb_data, // it is ok to pass these, we block the caller
+ NULL, NULL); // no need to pass, we will wait
+
+ if (code != HTTP_RESP_OK && !buffer_strlen(result_wb))
+ rrd_call_function_error(result_wb, "Collector reported error.", code);
+
+ dictionary_acquired_item_release(host->functions, host_function_acquired);
+ return code;
}
- return code;
+
+ // ------------------------------------------------------------------------
+ // the function can only be executed in async mode
+ // put the function into the inflight requests
+
+ char uuid_str[UUID_STR_LEN];
+ if(!transaction) {
+ uuid_t uuid;
+ uuid_generate_random(uuid);
+ uuid_unparse_lower(uuid, uuid_str);
+ transaction = uuid_str;
+ }
+
+ // put the request into the inflight requests
+ struct rrd_function_inflight t = {
+ .used = false,
+ .host = host,
+ .cmd = strdupz(cmd),
+ .sanitized_cmd = strdupz(sanitized_cmd),
+ .sanitized_cmd_length = sanitized_cmd_length,
+ .transaction = strdupz(transaction),
+ .timeout = timeout,
+ .cancelled = false,
+ .host_function_acquired = host_function_acquired,
+ .rdcf = rdcf,
+ .result = {
+ .wb = result_wb,
+ .cb = result_cb,
+ .data = result_cb_data,
+ },
+ .is_cancelled = {
+ .cb = is_cancelled_cb,
+ .data = is_cancelled_cb_data,
+ }
+ };
+ struct rrd_function_inflight *r = dictionary_set(rrd_functions_inflight_requests, transaction, &t, sizeof(t));
+ if(r->used) {
+ netdata_log_info("FUNCTIONS: duplicate transaction '%s', function: '%s'", t.transaction, t.cmd);
+ code = rrd_call_function_error(result_wb, "duplicate transaction", HTTP_RESP_BAD_REQUEST);
+ freez((void *)t.transaction);
+ freez((void *)t.cmd);
+ freez((void *)t.sanitized_cmd);
+ dictionary_acquired_item_release(r->host->functions, t.host_function_acquired);
+ return code;
+ }
+ r->used = true;
+ // internal_error(true, "FUNCTIONS: transaction '%s' started", r->transaction);
+
+ return rrd_call_function_async(r, wait);
}
+void rrd_function_cancel(const char *transaction) {
+ // internal_error(true, "FUNCTIONS: request to cancel transaction '%s'", transaction);
+
+ const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(rrd_functions_inflight_requests, transaction);
+ if(!item) {
+ netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the transaction is not running.",
+ transaction);
+ return;
+ }
+
+ struct rrd_function_inflight *r = dictionary_acquired_item_value(item);
+
+ bool cancelled = __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED);
+ if(cancelled) {
+ netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but it is already cancelled.",
+ transaction);
+ goto cleanup;
+ }
+
+ __atomic_store_n(&r->cancelled, true, __ATOMIC_RELAXED);
+
+ int32_t expected = __atomic_load_n(&r->rdcf->collector->refcount_canceller, __ATOMIC_RELAXED);
+ int32_t wanted;
+ do {
+ if(expected < 0) {
+ netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the collector is not running.",
+ transaction);
+ goto cleanup;
+ }
+
+ wanted = expected + 1;
+ } while(!__atomic_compare_exchange_n(&r->rdcf->collector->refcount_canceller, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED));
+
+ if(r->canceller.cb)
+ r->canceller.cb(r->canceller.data);
+
+ __atomic_sub_fetch(&r->rdcf->collector->refcount_canceller, 1, __ATOMIC_RELAXED);
+
+cleanup:
+ dictionary_acquired_item_release(rrd_functions_inflight_requests, item);
+}
+
+// ----------------------------------------------------------------------------
+
static void functions2json(DICTIONARY *functions, BUFFER *wb, const char *ident, const char *kq, const char *sq) {
- struct rrd_collector_function *t;
+ struct rrd_host_function *t;
dfe_start_read(functions, t) {
- if(!t->collector->running) continue;
+ if(!rrd_collector_running(t->collector)) continue;
if(t_dfe.counter)
buffer_strcat(wb, ",\n");
@@ -759,9 +1133,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) {
buffer_json_member_add_object(wb, "functions");
- struct rrd_collector_function *t;
+ struct rrd_host_function *t;
dfe_start_read(host->functions, t) {
- if(!t->collector->running) continue;
+ if(!rrd_collector_running(t->collector)) continue;
buffer_json_member_add_object(wb, t_dfe.name);
buffer_json_member_add_string(wb, "help", string2str(t->help));
@@ -782,9 +1156,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) {
void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size) {
if(!rrdset_functions_view || !dst) return;
- struct rrd_collector_function *t;
+ struct rrd_host_function *t;
dfe_start_read(rrdset_functions_view, t) {
- if(!t->collector->running) continue;
+ if(!rrd_collector_running(t->collector)) continue;
dictionary_set(dst, t_dfe.name, value, value_size);
}
@@ -794,9 +1168,9 @@ void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst,
void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help) {
if(!host || !host->functions || !dictionary_entries(host->functions) || !dst) return;
- struct rrd_collector_function *t;
+ struct rrd_host_function *t;
dfe_start_read(host->functions, t) {
- if(!t->collector->running) continue;
+ if(!rrd_collector_running(t->collector)) continue;
if(help)
*help = t->help;
@@ -806,10 +1180,15 @@ void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t
dfe_done(t);
}
+// ----------------------------------------------------------------------------
int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused,
void *collector_data __maybe_unused,
- function_data_ready_callback callback __maybe_unused, void *callback_data __maybe_unused) {
+ rrd_function_result_callback_t result_cb, void *result_cb_data,
+ rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data,
+ rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused,
+ void *register_canceller_cb_data __maybe_unused) {
+
time_t now = now_realtime_sec();
buffer_flush(wb);
@@ -1428,8 +1807,14 @@ int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const cha
buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + 1);
buffer_json_finalize(wb);
- if(callback)
- callback(wb, HTTP_RESP_OK, callback_data);
+ int response = HTTP_RESP_OK;
+ if(is_cancelled_cb && is_cancelled_cb(is_cancelled_cb_data)) {
+ buffer_flush(wb);
+ response = HTTP_RESP_CLIENT_CLOSED_REQUEST;
+ }
- return HTTP_RESP_OK;
+ if(result_cb)
+ result_cb(wb, response, result_cb_data);
+
+ return response;
}