diff options
Diffstat (limited to 'database/rrdfunctions.c')
-rw-r--r-- | database/rrdfunctions.c | 755 |
1 files changed, 570 insertions, 185 deletions
diff --git a/database/rrdfunctions.c b/database/rrdfunctions.c index d32a4b8c9..6c5baf346 100644 --- a/database/rrdfunctions.c +++ b/database/rrdfunctions.c @@ -1,3 +1,4 @@ +// SPDX-License-Identifier: GPL-3.0-or-later #define NETDATA_RRD_INTERNALS #include "rrd.h" @@ -37,17 +38,17 @@ static unsigned char functions_allowed_chars[256] = { [30] = '_', // [31] = '_', // [32] = ' ', // SPACE keep - [33] = '_', // ! - [34] = '_', // " - [35] = '_', // # - [36] = '_', // $ - [37] = '_', // % - [38] = '_', // & - [39] = '_', // ' - [40] = '_', // ( - [41] = '_', // ) - [42] = '_', // * - [43] = '_', // + + [33] = '!', // ! keep + [34] = '"', // " keep + [35] = '#', // # keep + [36] = '$', // $ keep + [37] = '%', // % keep + [38] = '&', // & keep + [39] = '\'', // ' keep + [40] = '(', // ( keep + [41] = ')', // ) keep + [42] = '*', // * keep + [43] = '+', // + keep [44] = ',', // , keep [45] = '-', // - keep [46] = '.', // . keep @@ -63,12 +64,12 @@ static unsigned char functions_allowed_chars[256] = { [56] = '8', // 8 keep [57] = '9', // 9 keep [58] = ':', // : keep - [59] = ':', // ; convert ; to : - [60] = '_', // < - [61] = ':', // = convert = to : - [62] = '_', // > - [63] = '_', // ? - [64] = '_', // @ + [59] = ';', // ; keep + [60] = '<', // < keep + [61] = '=', // = keep + [62] = '>', // > keep + [63] = '?', // ? keep + [64] = '@', // @ keep [65] = 'A', // A keep [66] = 'B', // B keep [67] = 'C', // C keep @@ -95,12 +96,12 @@ static unsigned char functions_allowed_chars[256] = { [88] = 'X', // X keep [89] = 'Y', // Y keep [90] = 'Z', // Z keep - [91] = '_', // [ - [92] = '/', // backslash convert \ to / - [93] = '_', // ] - [94] = '_', // ^ + [91] = '[', // [ keep + [92] = '\\', // backslash keep + [93] = ']', // ] keep + [94] = '^', // ^ keep [95] = '_', // _ keep - [96] = '_', // ` + [96] = '`', // ` keep [97] = 'a', // a keep [98] = 'b', // b keep [99] = 'c', // c keep @@ -127,10 +128,10 @@ static unsigned char functions_allowed_chars[256] = { [120] = 'x', // x keep [121] = 'y', // y keep [122] = 'z', // z keep - [123] = '_', // { - [124] = '_', // | - [125] = '_', // } - [126] = '_', // ~ + [123] = '{', // { keep + [124] = '|', // | keep + [125] = '}', // } keep + [126] = '~', // ~ keep [127] = '_', // [128] = '_', // [129] = '_', // @@ -277,16 +278,15 @@ typedef enum __attribute__((packed)) { // this is 8-bit } RRD_FUNCTION_OPTIONS; -struct rrd_collector_function { +struct rrd_host_function { bool sync; // when true, the function is called synchronously RRD_FUNCTION_OPTIONS options; // RRD_FUNCTION_OPTIONS STRING *help; int timeout; // the default timeout of the function - int (*function)(BUFFER *wb, int timeout, const char *function, void *collector_data, - function_data_ready_callback callback, void *callback_data); + rrd_function_execute_cb_t execute_cb; - void *collector_data; + void *execute_cb_data; struct rrd_collector *collector; }; @@ -299,6 +299,7 @@ struct rrd_collector_function { struct rrd_collector { int32_t refcount; + int32_t refcount_canceller; pid_t tid; bool running; }; @@ -310,8 +311,11 @@ struct rrd_collector { static __thread struct rrd_collector *thread_rrd_collector = NULL; static void rrd_collector_free(struct rrd_collector *rdc) { + if(rdc->running) + return; + int32_t expected = 0; - if(likely(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST))) { + if(!__atomic_compare_exchange_n(&rdc->refcount, &expected, -1, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { // the collector is still referenced by charts. // leave it hanging there, the last chart will actually free it. return; @@ -323,11 +327,11 @@ static void rrd_collector_free(struct rrd_collector *rdc) { // called once per collector void rrd_collector_started(void) { - if(likely(thread_rrd_collector)) return; + if(!thread_rrd_collector) + thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); - thread_rrd_collector = callocz(1, sizeof(struct rrd_collector)); thread_rrd_collector->tid = gettid(); - thread_rrd_collector->running = true; + __atomic_store_n(&thread_rrd_collector->running, true, __ATOMIC_RELAXED); } // called once per collector @@ -335,65 +339,110 @@ void rrd_collector_finished(void) { if(!thread_rrd_collector) return; - thread_rrd_collector->running = false; + __atomic_store_n(&thread_rrd_collector->running, false, __ATOMIC_RELAXED); + + int32_t expected = 0; + while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount_canceller, &expected, -1, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) { + expected = 0; + sleep_usec(1 * USEC_PER_MS); + } + rrd_collector_free(thread_rrd_collector); thread_rrd_collector = NULL; } +#define rrd_collector_running(c) __atomic_load_n(&(c)->running, __ATOMIC_RELAXED) + static struct rrd_collector *rrd_collector_acquire(void) { - __atomic_add_fetch(&thread_rrd_collector->refcount, 1, __ATOMIC_SEQ_CST); + rrd_collector_started(); + + int32_t expected = __atomic_load_n(&thread_rrd_collector->refcount, __ATOMIC_RELAXED), wanted = 0; + do { + if(expected < 0 || !rrd_collector_running(thread_rrd_collector)) { + internal_fatal(true, "FUNCTIONS: Trying to acquire a collector that is exiting."); + return thread_rrd_collector; + } + + wanted = expected + 1; + + } while(!__atomic_compare_exchange_n(&thread_rrd_collector->refcount, &expected, wanted, false, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)); + return thread_rrd_collector; } static void rrd_collector_release(struct rrd_collector *rdc) { if(unlikely(!rdc)) return; - int32_t refcount = __atomic_sub_fetch(&rdc->refcount, 1, __ATOMIC_SEQ_CST); - if(refcount == 0 && !rdc->running) + int32_t expected = __atomic_load_n(&rdc->refcount, __ATOMIC_RELAXED), wanted = 0; + do { + if(expected < 0) { + internal_fatal(true, "FUNCTIONS: Trying to release a collector that is exiting."); + return; + } + + if(expected == 0) { + internal_fatal(true, "FUNCTIONS: Trying to release a collector that is not acquired."); + return; + } + + wanted = expected - 1; + + } while(!__atomic_compare_exchange_n(&rdc->refcount, &expected, wanted, false, __ATOMIC_RELEASE, __ATOMIC_RELAXED)); + + if(wanted == 0) rrd_collector_free(rdc); } -static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, - void *rrdhost __maybe_unused) { - struct rrd_collector_function *rdcf = func; - - if(!thread_rrd_collector) - fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.", - __FUNCTION__, dictionary_acquired_item_name(item)); +static void rrd_functions_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost) { + RRDHOST *host = rrdhost; (void)host; + struct rrd_host_function *rdcf = func; + rrd_collector_started(); rdcf->collector = rrd_collector_acquire(); + +// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s", +// dictionary_acquired_item_name(item), rrdhost_hostname(host), +// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running"); } -static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, +static void rrd_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *rrdhost __maybe_unused) { - struct rrd_collector_function *rdcf = func; + struct rrd_host_function *rdcf = func; rrd_collector_release(rdcf->collector); } -static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, - void *new_func __maybe_unused, void *rrdhost __maybe_unused) { - struct rrd_collector_function *rdcf = func; - struct rrd_collector_function *new_rdcf = new_func; +static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, + void *new_func, void *rrdhost) { + RRDHOST *host = rrdhost; (void)host; + struct rrd_host_function *rdcf = func; + struct rrd_host_function *new_rdcf = new_func; - if(!thread_rrd_collector) - fatal("RRDSET_COLLECTOR: called %s() for function '%s' without calling rrd_collector_started() first.", - __FUNCTION__, dictionary_acquired_item_name(item)); + rrd_collector_started(); bool changed = false; if(rdcf->collector != thread_rrd_collector) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed collector from %d to %d", + dictionary_acquired_item_name(item), rrdhost_hostname(host), rdcf->collector->tid, thread_rrd_collector->tid); + struct rrd_collector *old_rdc = rdcf->collector; rdcf->collector = rrd_collector_acquire(); rrd_collector_release(old_rdc); changed = true; } - if(rdcf->function != new_rdcf->function) { - rdcf->function = new_rdcf->function; + if(rdcf->execute_cb != new_rdcf->execute_cb) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + + rdcf->execute_cb = new_rdcf->execute_cb; changed = true; } if(rdcf->help != new_rdcf->help) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed help text", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + STRING *old = rdcf->help; rdcf->help = new_rdcf->help; string_freez(old); @@ -403,41 +452,53 @@ static bool rrd_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_ string_freez(new_rdcf->help); if(rdcf->timeout != new_rdcf->timeout) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed timeout", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + rdcf->timeout = new_rdcf->timeout; changed = true; } if(rdcf->sync != new_rdcf->sync) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed sync/async mode", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + rdcf->sync = new_rdcf->sync; changed = true; } - if(rdcf->collector_data != new_rdcf->collector_data) { - rdcf->collector_data = new_rdcf->collector_data; + if(rdcf->execute_cb_data != new_rdcf->execute_cb_data) { + netdata_log_info("FUNCTIONS: function '%s' of host '%s' changed execute callback data", + dictionary_acquired_item_name(item), rrdhost_hostname(host)); + + rdcf->execute_cb_data = new_rdcf->execute_cb_data; changed = true; } +// internal_error(true, "FUNCTIONS: adding function '%s' on host '%s', collection tid %d, %s", +// dictionary_acquired_item_name(item), rrdhost_hostname(host), +// rdcf->collector->tid, rdcf->collector->running ? "running" : "NOT running"); + return changed; } - -void rrdfunctions_init(RRDHOST *host) { +void rrdfunctions_host_init(RRDHOST *host) { if(host->functions) return; host->functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, - &dictionary_stats_category_functions, sizeof(struct rrd_collector_function)); + &dictionary_stats_category_functions, sizeof(struct rrd_host_function)); dictionary_register_insert_callback(host->functions, rrd_functions_insert_callback, host); dictionary_register_delete_callback(host->functions, rrd_functions_delete_callback, host); dictionary_register_conflict_callback(host->functions, rrd_functions_conflict_callback, host); } -void rrdfunctions_destroy(RRDHOST *host) { +void rrdfunctions_host_destroy(RRDHOST *host) { dictionary_destroy(host->functions); } -void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, - bool sync, function_execute_at_collector function, void *collector_data) { +void rrd_function_add(RRDHOST *host, RRDSET *st, const char *name, int timeout, const char *help, + bool sync, rrd_function_execute_cb_t execute_cb, void *execute_cb_data) { // RRDSET *st may be NULL in this function // to create a GLOBAL function @@ -448,18 +509,20 @@ void rrd_collector_add_function(RRDHOST *host, RRDSET *st, const char *name, int char key[PLUGINSD_LINE_MAX + 1]; sanitize_function_text(key, name, PLUGINSD_LINE_MAX); - struct rrd_collector_function tmp = { + struct rrd_host_function tmp = { .sync = sync, .timeout = timeout, .options = (st)?RRD_FUNCTION_LOCAL:RRD_FUNCTION_GLOBAL, - .function = function, - .collector_data = collector_data, + .execute_cb = execute_cb, + .execute_cb_data = execute_cb_data, .help = string_strdupz(help), }; const DICTIONARY_ITEM *item = dictionary_set_and_acquire_item(host->functions, key, &tmp, sizeof(tmp)); if(st) dictionary_view_set(st->functions_view, key, item); + else + rrdhost_flag_set(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED); dictionary_acquired_item_release(host->functions, item); } @@ -468,7 +531,7 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { if(!st->functions_view) return; - struct rrd_collector_function *tmp; + struct rrd_host_function *tmp; dfe_start_read(st->functions_view, tmp) { buffer_sprintf(wb , PLUGINSD_KEYWORD_FUNCTION " \"%s\" %d \"%s\"\n" @@ -481,7 +544,9 @@ void rrd_functions_expose_rrdpush(RRDSET *st, BUFFER *wb) { } void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) { - struct rrd_collector_function *tmp; + rrdhost_flag_clear(host, RRDHOST_FLAG_GLOBAL_FUNCTIONS_UPDATED); + + struct rrd_host_function *tmp; dfe_start_read(host->functions, tmp) { if(!(tmp->options & RRD_FUNCTION_GLOBAL)) continue; @@ -496,20 +561,6 @@ void rrd_functions_expose_global_rrdpush(RRDHOST *host, BUFFER *wb) { dfe_done(tmp); } -struct rrd_function_call_wait { - bool free_with_signal; - bool data_are_ready; - netdata_mutex_t mutex; - pthread_cond_t cond; - int code; -}; - -static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) { - pthread_cond_destroy(&tmp->cond); - netdata_mutex_destroy(&tmp->mutex); - freez(tmp); -} - struct { const char *format; HTTP_CONTENT_TYPE content_type; @@ -558,41 +609,171 @@ int rrd_call_function_error(BUFFER *wb, const char *msg, int code) { return code; } -static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, struct rrd_collector_function **rdcf) { +static int rrd_call_function_find(RRDHOST *host, BUFFER *wb, const char *name, size_t key_length, const DICTIONARY_ITEM **item) { char buffer[MAX_FUNCTION_LENGTH + 1]; strncpyz(buffer, name, MAX_FUNCTION_LENGTH); char *s = NULL; - *rdcf = NULL; - while(!(*rdcf) && buffer[0]) { - *rdcf = dictionary_get(host->functions, buffer); - if(*rdcf) break; + bool found = false; + *item = NULL; + if(host->functions) { + while (buffer[0]) { + if((*item = dictionary_get_and_acquire_item(host->functions, buffer))) { + found = true; + + struct rrd_host_function *rdcf = dictionary_acquired_item_value(*item); + if(rrd_collector_running(rdcf->collector)) { + break; + } + else { + dictionary_acquired_item_release(host->functions, *item); + *item = NULL; + } + } - // if s == NULL, set it to the end of the buffer - // this should happen only the first time - if(unlikely(!s)) - s = &buffer[key_length - 1]; + // if s == NULL, set it to the end of the buffer + // this should happen only the first time + if (unlikely(!s)) + s = &buffer[key_length - 1]; - // skip a word from the end - while(s >= buffer && !isspace(*s)) *s-- = '\0'; + // skip a word from the end + while (s >= buffer && !isspace(*s)) *s-- = '\0'; - // skip all spaces - while(s >= buffer && isspace(*s)) *s-- = '\0'; + // skip all spaces + while (s >= buffer && isspace(*s)) *s-- = '\0'; + } } buffer_flush(wb); - if(!(*rdcf)) - return rrd_call_function_error(wb, "No collector is supplying this function on this host at this time.", HTTP_RESP_NOT_FOUND); - - if(!(*rdcf)->collector->running) - return rrd_call_function_error(wb, "The collector that registered this function, is not currently running.", HTTP_RESP_BACKEND_FETCH_FAILED); + if(!(*item)) { + if(found) + return rrd_call_function_error(wb, + "The collector that registered this function, is not currently running.", + HTTP_RESP_SERVICE_UNAVAILABLE); + else + return rrd_call_function_error(wb, + "No collector is supplying this function on this host at this time.", + HTTP_RESP_NOT_FOUND); + } return HTTP_RESP_OK; } -static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) { +// ---------------------------------------------------------------------------- + +struct rrd_function_inflight { + bool used; + + RRDHOST *host; + const char *transaction; + const char *cmd; + const char *sanitized_cmd; + size_t sanitized_cmd_length; + int timeout; + bool cancelled; + + const DICTIONARY_ITEM *host_function_acquired; + + // the collector + // we acquire this structure at the beginning, + // and we release it at the end + struct rrd_host_function *rdcf; + + struct { + BUFFER *wb; + + // in async mode, + // the function to call to send the result back + rrd_function_result_callback_t cb; + void *data; + } result; + + struct { + // to be called in sync mode + // while the function is running + // to check if the function has been cancelled + rrd_function_is_cancelled_cb_t cb; + void *data; + } is_cancelled; + + struct { + // to be registered by the function itself + // used to signal the function to cancel + rrd_function_canceller_cb_t cb; + void *data; + } canceller; +}; + +static DICTIONARY *rrd_functions_inflight_requests = NULL; + +static void rrd_functions_inflight_delete_cb(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *data __maybe_unused) { + struct rrd_function_inflight *r = value; + + // internal_error(true, "FUNCTIONS: transaction '%s' finished", r->transaction); + + freez((void *)r->transaction); + freez((void *)r->cmd); + freez((void *)r->sanitized_cmd); + dictionary_acquired_item_release(r->host->functions, r->host_function_acquired); +} + +void rrd_functions_inflight_init(void) { + if(rrd_functions_inflight_requests) + return; + + rrd_functions_inflight_requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE, NULL, sizeof(struct rrd_function_inflight)); + + dictionary_register_delete_callback(rrd_functions_inflight_requests, rrd_functions_inflight_delete_cb, NULL); +} + +void rrd_functions_inflight_destroy(void) { + if(!rrd_functions_inflight_requests) + return; + + dictionary_destroy(rrd_functions_inflight_requests); + rrd_functions_inflight_requests = NULL; +} + +static void rrd_inflight_async_function_register_canceller_cb(void *register_canceller_cb_data, rrd_function_canceller_cb_t canceller_cb, void *canceller_cb_data) { + struct rrd_function_inflight *r = register_canceller_cb_data; + r->canceller.cb = canceller_cb; + r->canceller.data = canceller_cb_data; +} + +// ---------------------------------------------------------------------------- +// waiting for async function completion + +struct rrd_function_call_wait { + RRDHOST *host; + const DICTIONARY_ITEM *host_function_acquired; + char *transaction; + + bool free_with_signal; + bool data_are_ready; + netdata_mutex_t mutex; + pthread_cond_t cond; + int code; +}; + +static void rrd_inflight_function_cleanup(RRDHOST *host __maybe_unused, + const DICTIONARY_ITEM *host_function_acquired __maybe_unused, + const char *transaction) { + dictionary_del(rrd_functions_inflight_requests, transaction); + dictionary_garbage_collect(rrd_functions_inflight_requests); +} + +static void rrd_function_call_wait_free(struct rrd_function_call_wait *tmp) { + rrd_inflight_function_cleanup(tmp->host, tmp->host_function_acquired, tmp->transaction); + freez(tmp->transaction); + + pthread_cond_destroy(&tmp->cond); + netdata_mutex_destroy(&tmp->mutex); + freez(tmp); +} + +static void rrd_async_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, int code, void *callback_data) { struct rrd_function_call_wait *tmp = callback_data; bool we_should_free = false; @@ -618,115 +799,308 @@ static void rrd_call_function_signal_when_ready(BUFFER *temp_wb __maybe_unused, } } -int rrd_call_function_and_wait(RRDHOST *host, BUFFER *wb, int timeout, const char *name) { - int code; +static void rrd_inflight_async_function_nowait_finished(BUFFER *wb, int code, void *data) { + struct rrd_function_inflight *r = data; - struct rrd_collector_function *rdcf = NULL; + if(r->result.cb) + r->result.cb(wb, code, r->result.data); - char key[PLUGINSD_LINE_MAX + 1]; - size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX); - code = rrd_call_function_find(host, wb, key, key_length, &rdcf); - if(code != HTTP_RESP_OK) - return code; + rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction); +} - if(timeout <= 0) - timeout = rdcf->timeout; +static bool rrd_inflight_async_function_is_cancelled(void *data) { + struct rrd_function_inflight *r = data; + return __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED); +} - struct timespec tp; - clock_gettime(CLOCK_REALTIME, &tp); - tp.tv_sec += (time_t)timeout; +static inline int rrd_call_function_async_and_dont_wait(struct rrd_function_inflight *r) { + int code = r->rdcf->execute_cb(r->result.wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data, + rrd_inflight_async_function_nowait_finished, r, + rrd_inflight_async_function_is_cancelled, r, + rrd_inflight_async_function_register_canceller_cb, r); - if(rdcf->sync) { - code = rdcf->function(wb, timeout, key, rdcf->collector_data, NULL, NULL); + if(code != HTTP_RESP_OK) { + if (!buffer_strlen(r->result.wb)) + rrd_call_function_error(r->result.wb, "Failed to send request to the collector.", code); + + rrd_inflight_function_cleanup(r->host, r->host_function_acquired, r->transaction); } - else { - struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait)); - tmp->free_with_signal = false; - tmp->data_are_ready = false; - netdata_mutex_init(&tmp->mutex); - pthread_cond_init(&tmp->cond, NULL); - - bool we_should_free = true; - BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it - temp_wb->content_type = wb->content_type; - code = rdcf->function(temp_wb, timeout, key, rdcf->collector_data, rrd_call_function_signal_when_ready, tmp); - if (code == HTTP_RESP_OK) { - netdata_mutex_lock(&tmp->mutex); - - int rc = 0; - while (rc == 0 && !tmp->data_are_ready) { - // the mutex is unlocked within pthread_cond_timedwait() - rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp); - // the mutex is again ours - } - if (tmp->data_are_ready) { - // we have a response - buffer_fast_strcat(wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb)); - wb->content_type = temp_wb->content_type; - wb->expires = temp_wb->expires; + return code; +} - if(wb->expires) - buffer_cacheable(wb); - else - buffer_no_cacheable(wb); +static int rrd_call_function_async_and_wait(struct rrd_function_inflight *r) { + struct timespec tp; + clock_gettime(CLOCK_REALTIME, &tp); + usec_t now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC; + usec_t end_ut = now_ut + r->timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT; + + struct rrd_function_call_wait *tmp = mallocz(sizeof(struct rrd_function_call_wait)); + tmp->free_with_signal = false; + tmp->data_are_ready = false; + tmp->host = r->host; + tmp->host_function_acquired = r->host_function_acquired; + tmp->transaction = strdupz(r->transaction); + netdata_mutex_init(&tmp->mutex); + pthread_cond_init(&tmp->cond, NULL); + + // we need a temporary BUFFER, because we may time out and the caller supplied one may vanish + // so, we create a new one we guarantee will survive until the collector finishes... + + bool we_should_free = true; + BUFFER *temp_wb = buffer_create(PLUGINSD_LINE_MAX + 1, &netdata_buffers_statistics.buffers_functions); // we need it because we may give up on it + temp_wb->content_type = r->result.wb->content_type; + + int code = r->rdcf->execute_cb(temp_wb, r->timeout, r->sanitized_cmd, r->rdcf->execute_cb_data, + // we overwrite the result callbacks, + // so that we can clean up the allocations made + rrd_async_function_signal_when_ready, tmp, + rrd_inflight_async_function_is_cancelled, r, + rrd_inflight_async_function_register_canceller_cb, r); + + if (code == HTTP_RESP_OK) { + netdata_mutex_lock(&tmp->mutex); + + bool cancelled = false; + int rc = 0; + while (rc == 0 && !cancelled && !tmp->data_are_ready) { + clock_gettime(CLOCK_REALTIME, &tp); + now_ut = tp.tv_sec * USEC_PER_SEC + tp.tv_nsec / NSEC_PER_USEC; + + if(now_ut >= end_ut) { + rc = ETIMEDOUT; + break; + } - code = tmp->code; + tp.tv_nsec += 10 * NSEC_PER_MSEC; + if(tp.tv_nsec > (long)(1 * NSEC_PER_SEC)) { + tp.tv_sec++; + tp.tv_nsec -= 1 * NSEC_PER_SEC; } - else if (rc == ETIMEDOUT) { - // timeout - // we will go away and let the callback free the structure - tmp->free_with_signal = true; - we_should_free = false; - code = rrd_call_function_error(wb, "Timeout while waiting for a response from the collector.", HTTP_RESP_GATEWAY_TIMEOUT); + + // the mutex is unlocked within pthread_cond_timedwait() + rc = pthread_cond_timedwait(&tmp->cond, &tmp->mutex, &tp); + // the mutex is again ours + + if(rc == ETIMEDOUT) { + rc = 0; + if (!tmp->data_are_ready && r->is_cancelled.cb && + r->is_cancelled.cb(r->is_cancelled.data)) { +// internal_error(true, "FUNCTIONS: transaction '%s' is cancelled while waiting for response", +// r->transaction); + rc = 0; + cancelled = true; + rrd_function_cancel(r->transaction); + break; + } } + } + + if (tmp->data_are_ready) { + // we have a response + buffer_fast_strcat(r->result.wb, buffer_tostring(temp_wb), buffer_strlen(temp_wb)); + r->result.wb->content_type = temp_wb->content_type; + r->result.wb->expires = temp_wb->expires; + + if(r->result.wb->expires) + buffer_cacheable(r->result.wb); else - code = rrd_call_function_error(wb, "Failed to get the response from the collector.", HTTP_RESP_INTERNAL_SERVER_ERROR); + buffer_no_cacheable(r->result.wb); - netdata_mutex_unlock(&tmp->mutex); + code = tmp->code; } - else { - if(!buffer_strlen(wb)) - rrd_call_function_error(wb, "Failed to send request to the collector.", code); + else if (rc == ETIMEDOUT || cancelled) { + // timeout + // we will go away and let the callback free the structure + tmp->free_with_signal = true; + we_should_free = false; + + if(cancelled) + code = rrd_call_function_error(r->result.wb, + "Request cancelled", + HTTP_RESP_CLIENT_CLOSED_REQUEST); + else + code = rrd_call_function_error(r->result.wb, + "Timeout while waiting for a response from the collector.", + HTTP_RESP_GATEWAY_TIMEOUT); } + else + code = rrd_call_function_error(r->result.wb, + "Internal error while communicating with the collector", + HTTP_RESP_INTERNAL_SERVER_ERROR); - if (we_should_free) { - rrd_function_call_wait_free(tmp); - buffer_free(temp_wb); - } + netdata_mutex_unlock(&tmp->mutex); + } + else { + if(!buffer_strlen(r->result.wb)) + rrd_call_function_error(r->result.wb, "The collector returned an error.", code); + } + + if (we_should_free) { + rrd_function_call_wait_free(tmp); + buffer_free(temp_wb); } return code; } -int rrd_call_function_async(RRDHOST *host, BUFFER *wb, int timeout, const char *name, - rrd_call_function_async_callback callback, void *callback_data) { +static inline int rrd_call_function_async(struct rrd_function_inflight *r, bool wait) { + if(wait) + return rrd_call_function_async_and_wait(r); + else + return rrd_call_function_async_and_dont_wait(r); +} + + +void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data); +// ---------------------------------------------------------------------------- + +int rrd_function_run(RRDHOST *host, BUFFER *result_wb, int timeout, const char *cmd, + bool wait, const char *transaction, + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data, const char *payload) { + int code; + char sanitized_cmd[PLUGINSD_LINE_MAX + 1]; + const DICTIONARY_ITEM *host_function_acquired = NULL; - struct rrd_collector_function *rdcf = NULL; - char key[PLUGINSD_LINE_MAX + 1]; - size_t key_length = sanitize_function_text(key, name, PLUGINSD_LINE_MAX); - code = rrd_call_function_find(host, wb, key, key_length, &rdcf); + // ------------------------------------------------------------------------ + // find the function + + size_t sanitized_cmd_length = sanitize_function_text(sanitized_cmd, cmd, PLUGINSD_LINE_MAX); + + if (is_dyncfg_function(sanitized_cmd, DYNCFG_FUNCTION_TYPE_ALL)) { + call_virtual_function_async(result_wb, host, sanitized_cmd, payload, result_cb, result_cb_data); + return HTTP_RESP_OK; + } + + code = rrd_call_function_find(host, result_wb, sanitized_cmd, sanitized_cmd_length, &host_function_acquired); if(code != HTTP_RESP_OK) return code; + struct rrd_host_function *rdcf = dictionary_acquired_item_value(host_function_acquired); + if(timeout <= 0) timeout = rdcf->timeout; - code = rdcf->function(wb, timeout, key, rdcf->collector_data, callback, callback_data); - if(code != HTTP_RESP_OK) { - if (!buffer_strlen(wb)) - rrd_call_function_error(wb, "Failed to send request to the collector.", code); + // ------------------------------------------------------------------------ + // the function can only be executed in sync mode + + if(rdcf->sync) { + // the caller has to wait + + code = rdcf->execute_cb(result_wb, timeout, sanitized_cmd, rdcf->execute_cb_data, + NULL, NULL, // no callback needed, it is synchronous + is_cancelled_cb, is_cancelled_cb_data, // it is ok to pass these, we block the caller + NULL, NULL); // no need to pass, we will wait + + if (code != HTTP_RESP_OK && !buffer_strlen(result_wb)) + rrd_call_function_error(result_wb, "Collector reported error.", code); + + dictionary_acquired_item_release(host->functions, host_function_acquired); + return code; } - return code; + + // ------------------------------------------------------------------------ + // the function can only be executed in async mode + // put the function into the inflight requests + + char uuid_str[UUID_STR_LEN]; + if(!transaction) { + uuid_t uuid; + uuid_generate_random(uuid); + uuid_unparse_lower(uuid, uuid_str); + transaction = uuid_str; + } + + // put the request into the inflight requests + struct rrd_function_inflight t = { + .used = false, + .host = host, + .cmd = strdupz(cmd), + .sanitized_cmd = strdupz(sanitized_cmd), + .sanitized_cmd_length = sanitized_cmd_length, + .transaction = strdupz(transaction), + .timeout = timeout, + .cancelled = false, + .host_function_acquired = host_function_acquired, + .rdcf = rdcf, + .result = { + .wb = result_wb, + .cb = result_cb, + .data = result_cb_data, + }, + .is_cancelled = { + .cb = is_cancelled_cb, + .data = is_cancelled_cb_data, + } + }; + struct rrd_function_inflight *r = dictionary_set(rrd_functions_inflight_requests, transaction, &t, sizeof(t)); + if(r->used) { + netdata_log_info("FUNCTIONS: duplicate transaction '%s', function: '%s'", t.transaction, t.cmd); + code = rrd_call_function_error(result_wb, "duplicate transaction", HTTP_RESP_BAD_REQUEST); + freez((void *)t.transaction); + freez((void *)t.cmd); + freez((void *)t.sanitized_cmd); + dictionary_acquired_item_release(r->host->functions, t.host_function_acquired); + return code; + } + r->used = true; + // internal_error(true, "FUNCTIONS: transaction '%s' started", r->transaction); + + return rrd_call_function_async(r, wait); } +void rrd_function_cancel(const char *transaction) { + // internal_error(true, "FUNCTIONS: request to cancel transaction '%s'", transaction); + + const DICTIONARY_ITEM *item = dictionary_get_and_acquire_item(rrd_functions_inflight_requests, transaction); + if(!item) { + netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the transaction is not running.", + transaction); + return; + } + + struct rrd_function_inflight *r = dictionary_acquired_item_value(item); + + bool cancelled = __atomic_load_n(&r->cancelled, __ATOMIC_RELAXED); + if(cancelled) { + netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but it is already cancelled.", + transaction); + goto cleanup; + } + + __atomic_store_n(&r->cancelled, true, __ATOMIC_RELAXED); + + int32_t expected = __atomic_load_n(&r->rdcf->collector->refcount_canceller, __ATOMIC_RELAXED); + int32_t wanted; + do { + if(expected < 0) { + netdata_log_info("FUNCTIONS: received a cancel request for transaction '%s', but the collector is not running.", + transaction); + goto cleanup; + } + + wanted = expected + 1; + } while(!__atomic_compare_exchange_n(&r->rdcf->collector->refcount_canceller, &expected, wanted, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)); + + if(r->canceller.cb) + r->canceller.cb(r->canceller.data); + + __atomic_sub_fetch(&r->rdcf->collector->refcount_canceller, 1, __ATOMIC_RELAXED); + +cleanup: + dictionary_acquired_item_release(rrd_functions_inflight_requests, item); +} + +// ---------------------------------------------------------------------------- + static void functions2json(DICTIONARY *functions, BUFFER *wb, const char *ident, const char *kq, const char *sq) { - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(functions, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; if(t_dfe.counter) buffer_strcat(wb, ",\n"); @@ -759,9 +1133,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) { buffer_json_member_add_object(wb, "functions"); - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(host->functions, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; buffer_json_member_add_object(wb, t_dfe.name); buffer_json_member_add_string(wb, "help", string2str(t->help)); @@ -782,9 +1156,9 @@ void host_functions2json(RRDHOST *host, BUFFER *wb) { void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void *value, size_t value_size) { if(!rrdset_functions_view || !dst) return; - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(rrdset_functions_view, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; dictionary_set(dst, t_dfe.name, value, value_size); } @@ -794,9 +1168,9 @@ void chart_functions_to_dict(DICTIONARY *rrdset_functions_view, DICTIONARY *dst, void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t value_size, STRING **help) { if(!host || !host->functions || !dictionary_entries(host->functions) || !dst) return; - struct rrd_collector_function *t; + struct rrd_host_function *t; dfe_start_read(host->functions, t) { - if(!t->collector->running) continue; + if(!rrd_collector_running(t->collector)) continue; if(help) *help = t->help; @@ -806,10 +1180,15 @@ void host_functions_to_dict(RRDHOST *host, DICTIONARY *dst, void *value, size_t dfe_done(t); } +// ---------------------------------------------------------------------------- int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const char *function __maybe_unused, void *collector_data __maybe_unused, - function_data_ready_callback callback __maybe_unused, void *callback_data __maybe_unused) { + rrd_function_result_callback_t result_cb, void *result_cb_data, + rrd_function_is_cancelled_cb_t is_cancelled_cb, void *is_cancelled_cb_data, + rrd_function_register_canceller_cb_t register_canceller_cb __maybe_unused, + void *register_canceller_cb_data __maybe_unused) { + time_t now = now_realtime_sec(); buffer_flush(wb); @@ -1428,8 +1807,14 @@ int rrdhost_function_streaming(BUFFER *wb, int timeout __maybe_unused, const cha buffer_json_member_add_time_t(wb, "expires", now_realtime_sec() + 1); buffer_json_finalize(wb); - if(callback) - callback(wb, HTTP_RESP_OK, callback_data); + int response = HTTP_RESP_OK; + if(is_cancelled_cb && is_cancelled_cb(is_cancelled_cb_data)) { + buffer_flush(wb); + response = HTTP_RESP_CLIENT_CLOSED_REQUEST; + } - return HTTP_RESP_OK; + if(result_cb) + result_cb(wb, response, result_cb_data); + + return response; } |